如何在 rxjava 中在 oncomplete 调用时取消订阅

在 rxjava 中,通常希望在 observable 的 oncomplete 事件上取消订阅,以防止进一步的事件发射。这可以通过设置 completablefuture 来实现。

非 android 环境中使用 rxjava 2.x 的示例

flowable 由持久层方法返回,作为调用方中的 result.getall(dbname.get(), strategy) 和 result.gettablecolumn(table)。

立即学习“Java免费学习笔记(深入)”;

public class consoleschemaflowableoutput implements schemaflowableoutput {
    private static final logger logger = loggerfactory.getlogger(consoleschemaflowableoutput.class);
    private volatile completablefuture future = new completablefuture<>();
    private atomicinteger count = new atomicinteger(0);

    @override
    public disposable flush(information information, flowable table) throws schemaexportexception {
        disposable export_flush_complete = table.subscribe(tableins -> {
            system.out.println(printasciitable(tableins));
            system.out.println(printasciicolumns(tableins.getcolumns()));
            system.out.println("
");
            count.addandget(1);
        }, throwable -> {
            logger.debug("export break, reason: " + throwable.getmessage());
            future.cancel(true);
            throw new schemaexportexception(throwable);
        }, new action() {
            @override
            public void run() throws exception {
                logger.debug("export complete, affect size:" + count.get());
                future.complete("ok");
            }
        });
        return export_flush_complete;
    }

    @override
    public completablefuture getfuture() {
        return future;
    }
    ...
} 登录后复制 

调用方

public void export(Information info, SchemaFlowableOutput out) throws SchemaExportException {
    long startStamp = System.currentTimeMillis();
    // Flowable
    Flowable
tableFlowable = result.getAll(dbName.get(), strategy).flatMap(new Function
>() { @Override public Publisher
apply(@NonNull Table table) throws Exception { return result.getTableColumn(table).flatMap(new Function, SingleSource
>() { @Override public SingleSource
apply(@NonNull List columns) throws Exception { return Single.just(table.fillColumn(columns)); } }).flatMapPublisher(new Function
>() { @Override public Publisher<? extends Table> apply(@NonNull Table table) throws Exception { return Flowable.just(table); } }); } }); Disposable disposable = null; try { disposable = out.flush(info, tableFlowable); CompletableFuture future = out.getFuture(); while (!future.isDone()) { logger.info("[ERE-Flowable]未完成,线程休眠1秒"); Thread.currentThread().sleep(1000, 0); } String result = future.get(); logger.info("[ERE-Flowable]完成, 结果:" + result); if (result.equals("OK")) { long finishStamp = System.currentTimeMillis(); clearHander(disposable, "[ERE-Flowable]RxJava disposed because complete, WithTime: " + (finishStamp - startStamp)); } } catch (Exception e) { clearHander(disposable, "[ERE-Flowable]RxJava disposed has Exception: " + e.getMessage()); } } private void clearHander(Disposable disposable, String reason) { logger.info(reason); if (null != disposable && !disposable.isDisposed()) { disposable.dispose(); } else { if (null != disposable) { logger.info("[CH]disposable status:" + disposable.isDisposed()); } else { logger.info("[CH]disposable is null:"); } } // 结束后的回调,执行一些清理工作 completeHandler.apply(); } 登录后复制

通过将 completablefuture 设置为 "ok" 来通知调用方 observable 已结束,这将取消订阅并执行后续处理。

以上就是RxJava中如何优雅地在onComplete调用时取消订阅?的详细内容,更多请关注慧达安全导航其它相关文章!

点赞(0)

评论列表 共有 0 条评论

暂无评论