2015. június 15., hétfő

The Reactive-Streams API (part 3)

Introduction


In this episode, I'm going to talk about how the rx.Subscriber's ability to register resources with it can be converted into the Reactive-Streams API. However, since RS doesn't specify any means of resource management, we need to introduce (rename the rx.Subscription) containers ourselves and attach them to the RS Subscriber's cancellation logic in some way.

Subscription vs. Subscription

To avoid confusion, RxJava 2.0 will replace the XXXSubscriptions with XXXDisposables classes and interfaces, however, I'm not going to detail these classes here, but only show the new base interfaces for resource management:

interface Disposable {
    boolean isDisposed();
    void dispose();
}

interface DisposableCollection extends Disposable {
    boolean add(Disposable resource);
    boolean remove(Disposable resource);
    boolean removeSilently(Disposable resource);

    void clear();
    boolean hasDisposables();
    boolean contains(Disposable resource);
}


The usage rules will remain the same: thread safety and idempotence.


The DisposableSubscription

The most basic way of adding resource management is to wrap around an existing Subscription with one that hijacks the cancel() method and calls dispose() on the underlying composite:

public final class DisposableSubscription
implements Disposable, Subscription {
    final Subscription actual;
    final DisposableCollection collection;
    public DisposableSubscription(
            Subscription actual, 
            DisposableCollection collection) {
        this.actual = Objects.requireNonNull(actual);
        this.collection = Objects.requireNonNull(collection);
    }
    public boolean add(Disposable resource) {
        return collection.add(resource);
    }
    public boolean remove(Disposable resource) {
        return collection.remove(resource);
    }
    public boolean removeSilently(Disposable resource) {
        return collection.remove(resource);
    }
    @Override
    public boolean isDisposed() {
        return collection.isDisposed();
    }
    @Override
    public void dispose() {
        cancel();
    }
    
    @Override
    public void cancel() {
        collection.dispose();
        actual.cancel();
    }
    @Override
    public void request(long n) {
        actual.request(n);
    }
}

By implementing the Disposable interface, the DisposableSubscription itself can now be added to a disposable-container and participate in a complex dispose network. However, most of the time you'd want to avoid extra allocation, therefore, the structure above will be likely inlined into some other class, perhaps into the Subscriber participating in a lift() call.

If you remember the RxJava specification and pitfall #2, you shouldn't unsubscribe your downstream, because it is likely you'll trigger a premature release of its resources.

(This is currently somewhat an issue with RxAndroid's LifecycleObservable where one can attach a takeUntil()-like operator in the middle of a chain which instead of sending out onCompleted(), it unsubscribes its downstream subscriber.)

With RS, such downstream unsubscription is practically infeasible. Each level can only either pass the Subscription as-is (and thus can't attach resources to it) or wrap it with a DisposableSubscription-like class and forward that downstream as just a Subscription. If you call the cancel() on your level, it can't call a cancel() of a class that wraps your Subscription in any way.

As always, you still can shoot yourself in the foot, but it takes much more effort than in RxJava today and the rule won't change: you shouldn't try to cancel/dispose your downstream's resources nor share them across the chain of operators.


TakeUntil

Now let's see how one can implement the takeUntil() operator with such externalized resource management (split into code parts for better readability):

public final class OperatorTakeUntil<T> 
implements Operator<T, T> {
    final Publisher<?> other;
    public OperatorTakeUntil(Publisher<?> other) {
        this.other = Objects.requireNonNull(other);
    }
    @Override
    public Subscriber<? super T> call(
            Subscriber<? super T> child) {
        Subscriber<T> serial = 
            new SerializedSubscriber<>(child);

        SubscriptionArbiter arbiter = 
            new SubscriptionArbiter();                       // (1)
        serial.onSubscribe(arbiter);
        
        SerialDisposable sdUntil = new SerialDisposable();   // (2)
        SerialDisposable sdParent = new SerialDisposable();  // (3)

So far, it looks similar to the current RxJava implementation: we wrap the child to serialize out the potentially asynchronous onError() or onCompleted() emitted by the other Publisher.

We create a SubscriptionArbiter (a conversion of ProducerArbiter) for the following reason: let's assume, we are still in the call() method when the other source, already subscribed to, emits a notification of any kind. We'd like to forward it to the child Subscriber, however, unless it has a Subscription available, we can't call onXXX methods on it just yet. Such a 'real' subscription through the main path will only arrive once the operator-chain starts to call onSubscribe(). I'll talk about this in more detail in the next blog post.

However, since the way the cancellation option is presented to Subscribers, we need a way (2) to pass the Subscription from the other Subscriber back to the parent Subscriber so if either of them reaches a terminal state, it can cancel() the other one. Since the other may get its Subscription and cancellation before the parent does, we need to be prepared to cancel the parent's subscription on reception as well (3).

            // ...
        Subscriber<T> parent = new Subscriber<T>() {
            DisposableSubscription dsub;
            @Override
            public void onSubscribe(Subscription s) {
                DisposableSubscription dsub = 
                        new DisposableSubscription(s, 
                        new DisposableList());              // (1)
                dsub.add(sdUntil);                          // (2)
                sdParent.set(dsub);
                arbiter.setSubscription(dsub);              // (3)
            }
            @Override
            public void onNext(T t) {
                serial.onNext(t);
            }
            @Override
            public void onError(Throwable t) {
                serial.onError(t);
                sdParent.cancel();                          // (4)
            }
            @Override
            public void onComplete() {
                serial.onComplete();
                sdParent.cancel();
            }
        };


The parent Subscriber is a slightly different: we need to handle the incoming Subscription and wire up the cancellation network:

  1. We create a DisposableSubscription wrapper with a List-based underlying collection.
  2. We add the SerialDisposable - that will hold the Disposable to the other Subscription - to this composite. We also add the DisposableSubscription to the sdParent to allow the other Subscriber to finish before the parent one can even start.
  3. We hand the wrapper down to the serializer.
  4. In case an error or completion event happens, we'll make sure the composite is cancelled/disposed on the spot. Since the composite holds a reference to the other Subscriber's subscription, this will also cancel that stream.
As the final segment, we need to create a Subscriber to the other stream and ensure it completes the parent:

        // ...
        Subscriber<Object> until = new Subscriber<Object>() {
            @Override
            public void onSubscribe(Subscription s) {
                sdUntil.set(Disposables.create(s::cancel));    // (1)
                s.request(Long.MAX_VALUE);
            }
            @Override
            public void onNext(Object t) {
                parent.onComplete();                           // (2)
            }
            @Override
            public void onError(Throwable t) {
                parent.onError(t);
            }
            @Override
            public void onComplete() {
                parent.onComplete();
            }
        };
        
        this.other.subscribe(until);
        
        return parent;
    }
}

We receive the Subscription from the other source (1) and wrap it into a Disposable (similar to how RxJava's Subscription.create() does this now). Due to the terminal nature of Disposables, even if the main chain completes before the other chain has even the chance to receive a Subscription, the SerialDisposable will become disposed and will immediately dispose the other's Subscription.

Note that since the ability to cancel and thus dispose resources depends on timing and ordering, usually one will end up needing some disposable-container up-front (i.e., sdParent, sdOther) so regardless of which Subscription arrives when, they can cancel out each other as necessary.


TakeUntil v2

Looking at our takeUntil() implementation, one can discover that reorganizing the various Subscriptions, it is possible to clear up the mess the Disposables have caused.

    @Override
    // ...
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        Subscriber<T> serial = new SerializedSubscriber<>(child);

        SubscriptionArbiter sa = new SubscriptionArbiter();        // (1)

        DisposableSubscription dsub = 
            new DisposableSubscription(sa, new DisposableList());  // (2)
        
        serial.onSubscribe(dsub);                                  // (3)
        
        Subscriber<T> parent = new Subscriber<T>() {
            @Override
            public void onSubscribe(Subscription s) {
                dsub.add(Disposables.create(s::cancel));           // (4)
                sa.setSubscription(s);                             // (5)
            }
            // ...
        };
        
        Subscriber<Object> until = 
        new Subscriber<Object>() {
            @Override
            public void onSubscribe(Subscription s) {
                dsub.add(Disposables.create(s::cancel));           // (6)
                s.request(Long.MAX_VALUE);
            }
            // ...

It works as follows:

  1. We create the SubscriptionArbiter as before,
  2. then it is wrapped by a DisposableSubscription,
  3. which is then pushed downstream. The pair will make sure cancellation and requests are accumulated until the arbiter receives a 'real' Subscription.
  4. Once the main receives its Subscription, we turn it into a Disposable and add it to the dsub composite,
  5. followed by updating the arbiter's current subscription: any accumulated requests and any cancellation is now 'replayed' to the upstream Subscription.
  6. Once the other source sends its Subscription, we turn it into a Disposable as well and request everything.
Naturally, the parent will now call dsub.dispose() in its onError() and onComplete() methods.

Let's think about the various cancellation paths:

  • Downstream's cancellation: it will cancel the dsub, dsub will cancel the arbiter, the arbiter will immediately cancel any Subscription it receives.
  • Main completion: it will cancel the dsub, dsub will cancel the arbiter and the arbiter will cancel the upstream's Subscription. In addition, dsub will immediately cancel the other's subscription if added.
  • Other completion: it will cancel the dsub, dsub will cancel the arbiter. Once the main receives a Subscription, either dsub or the arbiter will cancel it immediately.

Conclusion

In this blog post, I've talked about how resource management can be performed with respect to Reactive-Stream's Subscribers and Subscriptions and shown an example implementation of a takeUntil() operator.

Although it looks like we have to, at least, allocate as many objects similar to RxJava, note that many operators don't really require resource management themselves (example: take()) or don't even need to wrap the Subscription instance in any way (example: map()).

In the next and final post about the RS API, I'm going to talk about the increased need for various arbiter classes (briefly mentioned in the takeUntil() example), because one can't leave a Subscriber without a Subscription and expect cancellation to work and one can't call onSubscribe() multiple times either if the source changes.

Nincsenek megjegyzések:

Megjegyzés küldése