RxJava Zip操作符详解

Posted by lizubing1992 on 12-13,2018

ObserableZip 针对解析类中的subscribeActual(Observer)
-->ZipCoordinator.subscribe()
-->ZipCoordinator.onNext()
-->ZipCoordinator#drain()
代码如下

public void drain() {
    final ZipObserver<T, R>[] zs = observers;
    final Observer<? super R> a = actual;
    // row 在我们前面提到过
    final T[] os = row;


    for (; ; ) {
        int i = 0;
        int emptyCount = 0;
        for (ZipObserver<T, R> z : zs) {
            if (os[i] == null) {
                boolean d = z.done;
                T v = z.queue.poll();
                boolean empty = v == null;

                if (!empty) {
                    os[i] = v;
                } else {
                    emptyCount++;
                }
            } else {
                // ...
            }
            i++;
        }

        if (emptyCount != 0) {
            break;
        }

        R v = zipper.apply(os.clone();

        a.onNext(v);

        Arrays.fill(os, null);
    }
}

底层的实现是多个队列和一个数组
时间发送过来,首先入队列,然后回查看每个元素是否为空
若空 就去指定的队列poll出队列
poll为null 队列中还没时间被发射出来,emptyCount++
不为null 则跳出次循环


0评论