21 Mayıs 2020 Perşembe

RxJava Observable Sınıfı

Giriş
RxJava, ReactiveX kavramının Java gerçekleştirimi. Şu satırı dahil ederiz.
<dependency>
  <groupId>io.reactivex</groupId>
  <artifactId>rxjava</artifactId>
  <version>1.3.2</version>
</dependency>
Gradle ile şöyle yaparız.
dependencies {
  ...
  compile 'io.reactivex:rxandroid:1.2.1'
  compile 'io.reactivex:rxjava:1.2.1'
}
ReactiveStreams (RxJava 2.0) için şöyle yaparız.
compile('io.reactivex:rxjava-reactive-streams')
Olayı basit tutmak için RxJava'nın amacını asenkron bir işlemi gerçekleştirip bunun sonucunu UI'da göstermek diye düşünelim. Klasik programlama dillerinde asenkron işlem tüm cevabı hazırlayıp gönderir.

RxJava ile cevap peyderpey gönderilebilir. Böylece UI ara ara güncellenerek en son durum görüntülebilir.


Back Pressure
Back Pressure ile ilgili detaylı örnekler burada.

Java 8 Stream İle Farkı Nedir?
Açıklaması şöyle.
Java 8 Streams are pull based. You iterate over a java 8 stream consuming each item. And it could be an endless stream.

RXJava Observable is by default push based. You subscribe to an Observable and you will get notified when the next item arrives (onNext), or when the stream is completed (onCompleted), or when an error occured (onError).
Observable Benzeri Sınıflar
Single,Completable, Maybe gibi benzer metodları sunan sınıflar da var. Açıklaması şöyle.
Completable, Single and Maybe have by design no-need for backpressure support, they will offer a rich API as well and defer any workload until subscribed.

buffer metodu
Satırları 4'er okumak için şöyle yaparız.
Stream<String> stream = Files.lines(Paths.get("file.txt"))

Observable.fromIterable(stream::iterator)
          .buffer(4)                      // Observable<List<String>>
          .map(x -> String.join(", ", x)) // Observable<String>
          .forEach(System.out::println);
create metodu
Observable.create metodu yazısına taşıdım

concatMap metodu
Observable.concatMap metodu yazısına taşıdım

doOnNext metodu
Şöyle yaparız.
Observable.just(new Object())
  .doOnNext(o -> { /* /* This method catches the on next but doesn't consume it. */})
  .doOnComplete(() -> { /* test */})
  .doOnError(throwable -> {/* This method catches the error but doesn't consume it. */})
  .subscribe(o ->
             {/* success */}
             ,
             throwable -> {/* error */} // here error will be consumed at the end
  );
Şöyle yaparız.
Observable.just(new Object())
  .doOnNext(o -> { /* /* This method catches the on next but doesn't consume it. */})
  .doOnComplete(() -> { /* test */})
  .doOnError(throwable -> {/* This method catches the error but doesn't consume it. */})
  .subscribe(
    new DefaultObserver<Object>() {
      @Override
      public void onNext(Object o) {

      }

      @Override
      public void onError(Throwable e) {
        // here error will be consumed at the end
      }

      @Override
      public void onComplete() {

      }
    }
  );
defer metodu
İmzası şöyle.
public final static <T> Observable<T> defer(Func0<Observable<T>> observableFactory)...
Açıklaması şöyle.
Returns an Observable that calls an Observable factory to create an Observable for each new Observer that subscribes. That is, for each subscriber, the actual Observable that subscriber observes is determined by the factory function.
Bir abone geldiğinde defer() metoduna verilen Func0 çağrılır. Func0'in imzası şöyle.
public interface Func0<R> extends Function, Callable<R> {
    @Override
    public R call();
}
Kolay olsun diye şöyle düşünülebilir.
interface Func0<R> {
    R call();
}
Şöyle yaparız.
Observable.defer(new Func0<Observable<String>>() {
  @Override
  public Observable<String> call() {
    try {
      return Observable.just(Database.readValue());       
    }
    catch(IOException e) {
      return Observable.error(e);     
    }   
  })
  .subscribe(new Action1<String>() {
    @Override
    public void call(String result) {
      resultTextView.setText(result);     
    }   
  }
}
flatMapSequential  metodu
Observable.flatMapSequential yazısına taşıdım

from metodu
Örnek ver.

fromIterable metodu
Şöyle yaparız.
List<Form> form = ...;
Observable.fromIterable(form)
.concatMapIterable(Form::getFormVersions) // get form version list from single form object
.doOnSubscribe(disposable -> AppLogger.i(tag, "Form Versions download is subscribed"))
.filter(this::checkFormVersionToDownloadOrNot) //get those versions to be downloaded
.doOnNext(formVersion -> { // get formVersion object

  AppLogger.i(tag, "download this form ---------> " + formVersion.getFormUrl());
  AppLogger.i(tag, formVersion.getFormUrl());
  String constructURL = formVersion.getFormUrl();
  /* download form gzip process starts from here */
  ...
})
.doOnTerminate(() -> AppLogger.i(tag, "Form Versions terminated"))
.doOnError(Throwable::printStackTrace)
.subscribe(
  formVersion -> AppLogger.i(tag, "Form Versions download completed"),
  throwable -> AppLogger.e(tag, throwable.getMessage(), throwable)
 );
just metodu
Örnek
Tek bir nesne dönmek için şöyle yaparız.
Observable.just("item").subscribe(
  new Observer<String>() {
  @Override
  public void onSubscribe(Disposable d) {
   ...
  }

  @Override
  public void onNext(String s) {
    ...
  }

  @Override
  public void onError(Throwable e) {
    ...
  }

  @Override
  public void onComplete() {
    ...
  }
})
Örnek
Liste kullanmak için şöyle yaparız.
List<Foo> list = ...;
Observable.just(list).flatMapIterable(i -> i).groupBy(f -> f.Id)
Örnek
Dizi kullanmak için şöyle yaparız.
String[] strings = {"Hello", "World"};
Observable<String[]> stringsObservable = Observable.just(strings);
merge metodu
Şöyle yaparız.
public static Observable<ResultSet> queryAllAsObservable(Session session, String query,
  Object... partitionKeys) {
  List<ResultSetFuture> futures = sendQueries(session, query, partitionKeys);
  Scheduler scheduler = Schedulers.io();
  List<Observable<ResultSet>> observables = Lists.transform(futures,
    (ResultSetFuture future) -> Observable.fromFuture(future, scheduler));
  return Observable.merge(observables);
}
range metodu
Elimizde şöyle bir sınıf olsun.
public class MyClass {
  private int number;

  public MyClass(int number) {
    super();
    this.number = number;
  }

  public int getNumber() {
    return number;
  }

}
Bu sınıftan 50 tane farklı sayı ile ilklendirmek için şöyle yaparız.
 Observable.range(1, 10).map(number -> new MyClass(number)).toList()
  .subscribe(myClasses -> printData(myClasses));
Nesne listesi oluşturulduktan sonra şu metod çağrılır.
static void printData(List<MyClass> mydata) {
  mydata.stream().forEach(myClass -> System.out.println(myClass.getNumber()));
}
subscribe metodu
İmzası şöyle.
public final Subscription subscribe(final Action1<? super T> onNext)...
Açıklaması şöyle.
Subscribes to an Observable and provides a callback to handle the items it emits.
Action1'in imzası şöyle.
public interface Action1<T1> extends Action {
  public void call(T1 t1);
}
Kolay olsun diye şöyle düşünülebilir.
interface Action1<T> {
    void call(T t);
}
Örnek
Şöyle yaparız.
Observable.just(1).subscribe(new Observer<Integer>() {

  Disposable disposable;

  @Override
  public void onSubscribe(Disposable disposable) {
    System.out.println("Subscribed");
    this.disposable = disposable;
  }

  @Override
  public void onNext(Integer integer) {
    System.out.println(integer);
    System.out.println(disposable.isDisposed());
  }

  @Override
  public void onError(Throwable throwable) {
    System.out.println("Error");
    System.out.println(disposable.isDisposed());
  }

  @Override
  public void onComplete() {
    System.out.println("Complete");
    System.out.println(disposable.isDisposed());
  }
})
Örnek
Bu örnekte Observer yerine Subscriber kullanılıyor. Subscriber da Observer arayüzünden kalıtır.
public abstract class Subscriber<T> implements Observer<T>, Subscription
Şöyle yaparız.
Subscription subscription =  Observable.subscribe(new Subscriber<Type>() {
  @Override
  public void onCompleted() {
  }

  @Override
  public void onError(Throwable e) {
  }

  @Override
  public void onNext(String responseString) {
  }
});
subscribeOn metodu
Observable ve Threading yazsına taşıdım.

Observable.switchMap metodu
Observable.switchMap metodu yazsına taşıdım.

timer metodu
C# ile şöyle yaparız.
IDisposable subscription =
    Observable
        .Timer(TimeSpan.FromMilliseconds(10.0))
        .Subscribe(_ => { /* Do Stuff Here */ });
using metodu
Şöyle yaparız
Observable<String> observableFile2(Path path) {
  return Observable.using(
    () -> Files.newBufferedReader(path),
    reader -> Observable.fromIterable(() -> new Iterator<>()),
    BufferedReader::close
);
zip metod
Observable.zip metodu yazısına taşıdım.



Hiç yorum yok:

Yorum Gönder