网易首页 > 网易号 > 正文 申请入驻

JDK9响应式流使用详解

0
分享至

上文中咱们简单提到了JDK9中Flow接口中的静态内部类实现了响应式流的JAVA API,并且提供了一个一个Publisher的实现类SubmissionPublisher。本文将先梳理一下接口中具体的处理流程,然后再以几个调用者的例子来帮助大家理解。

JDK9中的实现

再放上一下上文中的响应式流的交互流程:

  1. 订阅者向发布者发送订阅请求。
  2. 发布者根据订阅请求生成令牌发送给订阅者。
  3. 订阅者根据令牌向发布者发送请求N个数据。
  4. 发送者根据订阅者的请求数量返回M(M<=N)个数据
  5. 重复3,4
  6. 数据发送完毕后由发布者发送给订阅者结束信号

该流程的角度是以接口调用的交互来说的,而考虑实际的coding工作中,我们的调用流程其实为:

  1. 创建发布者
  2. 创建订阅者
  3. 订阅令牌交互
  4. 发送信息

接下来我们按照这个流程来梳理一下代码细节。

创建发布者

对于实现响应流的最开始的步骤,便是创建一个发布者。之前提到在JDK9中提供了一个发布者的简单实现SubmissionPublisher。SubmissionPublisher继承自Flow.Publisher,他有三种构造函数:

public SubmissionPublisher() {
this(ASYNC_POOL, Flow.defaultBufferSize(), null);
}
public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
this(executor, maxBufferCapacity, null);
}
public SubmissionPublisher(Executor executor, int maxBufferCapacity,
BiConsumer, ? super Throwable> handler)

SubmissionPublisher将使用Executor作为“线程池”向订阅者发送信息。如果需要需要设置线程池的话可以自己传入,否则的话再无参的构造函数中将默认使用ForkJoinPool类的commonPool()方法获取,即无餐构造方法中的ASYNC_POOL静态变量。

SubmissionPublisher会为每一个订阅者单独的建立一个缓冲空间,其大小由入参maxBufferCapacity决定。默认情况下直接使用Flow.defaultBufferSize()来设置,默认为256。如果缓冲区满了之后会根据发送信息时候的策略确定是阻塞等待还是抛弃数据。

SubmissionPublisher会在订阅者发生异常的时候(onNext处理中),会调用最后一个参数handler方法,然后才会取消订阅。默认的时候为null,也就是不会处理异常。

最简单的创建SubmissionPublisher的方法就是直接使用无参构造方法:

SubmissionPublisher
publisher = new SubmissionPublisher<>();

上文书说到,因为SubmissionPublisher实现了AutoCloseable接口,所以可以用try来进行资源回收可以省略close()的调用:

try (SubmissionPublisher
publisher = new SubmissionPublisher<>()){}

但是也可以手动的调用close()方法来显示的关闭发布者,关闭后再发送数据就会抛出异常:

if (complete) throw new IllegalStateException("Closed");
创建订阅者

上文中咱们没有手动创建订阅者,而是直接调用SubmissionPublisher中的consume方法使用其内部的订阅者来消费消息。在本节可以实现接口Flow.Subscriber

创建一个SimpleSubscriber类:

public class SimpleSubscriber implements Flow.Subscriber {
private Flow.Subscription subscription;
/**
* 订阅者名称
*/
private String name;
/**
* 定义最大消费数量
*/
private final long maxCount;
/**
* 计数器
*/
private long counter;
public SimpleSubscriber(String name, long maxCount) {
this.name = name;
this.maxCount = maxCount <= 0 ? 1 : maxCount;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.printf("订阅者:%s,最大消费数据: %d。%n", name, maxCount);
// 实际上是等于消费全部数据
subscription.request(maxCount);
}
@Override
public void onNext(Integer item) {
counter++;
System.out.printf("订阅者:%s 接收到数据:%d.%n", name, item);
if (counter >= maxCount) {
System.out.printf("准备取消订阅者: %s。已处理数据个数:%d。%n", name, counter);
// 处理完毕,取消订阅
subscription.cancel();
}
}
@Override
public void onError(Throwable t) {
System.out.printf("订阅者: %s,出现异常: %s。%n", name, t.getMessage());
}
@Override
public void onComplete() {
System.out.printf("订阅者: %s 处理完成。%n", name);
}
}

SimpleSubscriber是一个简单订阅者类,其逻辑是根据构造参数可以定义其名称name与最大处理数据值maxCount,最少处理一个数据。

当发布者进行一个订阅的时候会生成一个令牌Subscription作为参数调用onSubscribe方法。在订阅者需要捕获该令牌作为后续与发布者交互的纽带。一般来说在onSubscribe中至少调用一次request且参数需要>0,否则发布者将无法向订阅者发送任何信息,这也是为什么maxCount需要大于0。

当发布者开始发送数据后,会异步的调用onNext方法并将数据传入。该类中使用了一个计数器对数据数量进行校验,当达到最大值的时候,则会通过令牌(subscription)异步通知发布者订阅结束,然后发送者再异步的调用发订阅者的onComplete方法,以处理完成流程。

其中的onError和onComplete方法只进行打印,这里就不再说了。

以上的这个订阅者可以看作是一个push模型的实现,因为当开始订阅时订阅者就约定了需要接受的数量,然后在后续的处理(onNext)中不再请求新数据。

我们可以用以下的代码创建一个名称为S1,消费2个元素的订阅者:

SimpleSubscriber sub1 = new SimpleSubscriber("S1", 2);
订阅令牌交互

当我们可以创建了发送者和订阅者之后,我们需要确认一下进行交互的顺序,由于响应流的处理就是对于事件的处理,所以事件的顺序十分重要,具体顺序如下:

  1. 我们创建一个发布者publisher一个订阅者subscriber
  2. 订阅者subscriber通过调用发布者的subscribe()方法进行信息订阅。如果订阅成功,则发布者将生成一个令牌(Subscription)并作为入参调用订阅者的订阅事件方法onSubscribe()。如果调用异常则会直接调用订阅者的onError错误处理方法,并抛出IllegalStateException异常然后结束订阅。
  3. 在onSubscribe()中,订阅者需要通过调用令牌(Subscription)的请求方法request(long)来异步的向发布者请求数据。
  4. 当发布者有数据可以发布的时候,则会异步的调用订阅者的onNext()方法,直到所有消息的总数已经满足了订阅者调用request的数据请求上限。所以当订阅者请求订阅的消息数为Long.MAX_VALUE时,实际上是消费所有数据,即push模式。如果发布者没有数据要发布了,则可以会调用发布者自己的close()方法并异步的调用所有订阅者的onComplete()方法来通知订阅结束。
  5. 发布者可以随时向发布者请求更多的元素请求(一般在onNext里),而不用等到之前的处理完毕,一般是与之前的数据数量进行累加。
  6. 放发布者遇到异常的时候会调用订阅者的onError()方法。

上面的描述中是只使用的一个订阅者来进行描述的,后面的例子中将说明发布者可以拥有多个订阅者(甚至0个订阅者)。

发送信息

当发布者需要推送消息的时候会调用submit方法或者offer方法,上文中我们提到submit实际上是offer的一种简单实现,本节咱们自己比较一下。

首先他们的方法签名为:

int offer(T item, long timeout, TimeUnit unit, BiPredicate,? super T> onDrop)
int offer(T item, BiPredicate,? super T> onDrop)
int submit(T item)

而submit 和 offer的直接方法为:

public int submit(T item) {
return doOffer(item, Long.MAX_VALUE, null);
}
public int offer(T item,
BiPredicate, ? super T> onDrop) {
return doOffer(item, 0L, onDrop);

可以看到他们的底层调用的都是 doOffer 方法,而doOffer的方法签名为:

private int doOffer(T item, long nanos,
BiPredicate, ? super T> onDrop)

所以我们可以直接看doOffer()方法。doOffer()方法是可选阻塞时长的,而时长根据入参数nanos来决定。而onDrop()是一个删除判断器,如果调用BiPredicate的test()方法结果为true则会再次重试(根据令牌中的nextRetry属性与发布器中的retryOffer()方法组合判断,但是具体实现还没梳理明白);如果结果为flase则直接删除内容。doOffer()返回的结果为正负两种,正数的结果为发送了数据,但是订阅者还未消费的数据(估计值,因为是异步多线程的);如果为负数,则返回的是重拾次数。

所以,根据submit()的参数我们可以发现,submit会一直阻塞直到数据可以被消费(因为不会阻塞超时,所以不需要传入onDrop()方法)。而我们可以根据需要配置offer()选择器。如果必须要求数据都要被消费的话,那就可以直接选择submit(),如果要设置重试次数的话就可以选择使用offer()

异步调用的例子

下面看一个具体的程序例子,程序将以3秒为周期进行数据发布:

public class PeriodicPublisher {
public static final int WAIT_TIME = 2;
public static final int SLEEP_TIME = 3;
public static void main(String[] args) {
SubmissionPublisher publisher = new SubmissionPublisher<>();
// 创建4订阅者
SimpleSubscriber subscriber1 = new SimpleSubscriber("S1", 2);
SimpleSubscriber subscriber2 = new SimpleSubscriber("S2", 4);
SimpleSubscriber subscriber3 = new SimpleSubscriber("S3", 6);
SimpleSubscriber subscriber4 = new SimpleSubscriber("S4", 10);
// 前三个订阅者直接进行订阅
publisher.subscribe(subscriber1);
publisher.subscribe(subscriber2);
publisher.subscribe(subscriber3);
// 第四个方法延迟订阅
delaySubscribeWithWaitTime(publisher, subscriber4);
// 开始发送消息
Thread pubThread = publish(publisher, 5);
try {
// 等待处理完成
pubThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static Thread publish(SubmissionPublisher publisher, int count) {
Thread t = new Thread(() -> {
IntStream.range(1,count)
.forEach(item ->{
publisher.submit(item);
sleep(item);
});
publisher.close();
});
t.start();
return t;
}

private static void sleep(Integer item) {
try {
System.out.printf("推送数据:%d。休眠 3 秒。%n", item);
TimeUnit.SECONDS.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void delaySubscribeWithWaitTime(SubmissionPublisher publisher, Flow.Subscriber sub) {
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(WAIT_TIME);
publisher.subscribe(sub);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}

代码后是运行结果如下:

订阅者:S1,最大消费数据: 2。
推送数据:1。休眠 3 秒。
订阅者:S3,最大消费数据: 6。
订阅者:S2,最大消费数据: 4。
订阅者:S2 接收到数据:1.
订阅者:S3 接收到数据:1.
订阅者:S1 接收到数据:1.
订阅者:S4,最大消费数据: 10。
推送数据:2。休眠 3 秒。
订阅者:S2 接收到数据:2.
订阅者:S3 接收到数据:2.
订阅者:S1 接收到数据:2.
订阅者:S4 接收到数据:2.
准备取消订阅者: S1。已处理数据个数:2。
推送数据:3。休眠 3 秒。
订阅者:S4 接收到数据:3.
订阅者:S2 接收到数据:3.
订阅者:S3 接收到数据:3.
推送数据:4。休眠 3 秒。
订阅者:S4 接收到数据:4.
订阅者:S3 接收到数据:4.
订阅者:S2 接收到数据:4.
准备取消订阅者: S2。已处理数据个数:4。
推送数据:5。休眠 3 秒。
订阅者:S3 接收到数据:5.
订阅者:S4 接收到数据:5.
订阅者: S3 处理完成。
订阅者: S4 处理完成。

由于是异步执行,所以在“接收数据”部分的顺序可能不同。

我们分析一下程序的执行流程。

  • 创建一个发布者实例
  • 创建四个订阅者实例S1、S2、S3、S4,可以接收数据的数量分别为:2、4、6、10。
  • 前三个订阅者立即订阅消息。
  • S4的订阅者单独创建一个线程等待WAIT_TIME秒(2秒)之后进行数据的订阅。
  • 新建一个线程来以SLEEP_TIME秒(3秒)为间隔发布5个数据。
  • 将publish线程join()住等待流程结束。

执行的日志满足上述流程而针对一些关键点为:

  • S4在发送者推送数据"1"的时候还未订阅,所以S4没有接收到数据"1"。
  • 当发送数据"2"的时候S1已经接收够了预期数据2个,所以取消了订阅。之后只剩下S2、S3、S4。
  • 当发送数据"4"的时候S2已经接收够了预期数据4个,所以取消了订阅。之后只剩下S3、S4。
  • 当发送数据"5"的时候只剩下S3、S4,当发送完毕后publisher调用close()方法,通知S3、S4数据处理完成。

需要注意的是,如果在最后submit完毕之后直接close()然后结束进行的话可能订阅者并不能执行完毕。但是由于在任意一次submit()之后都有一次3秒的等待,所以本程序是可以执行完毕的。

最后

本文中的例子是是简单的实现,可以通过调整订阅者中的request的参数,与在onNext中添加request调用来测试背压的效果,还可以将submit调整为offer并添加onDrop方法以观察抛弃信息时的流程。同时本文没有提供Processor的例子,各位也可以自行学习。

总结一下流程: 订阅者向发布者进行订阅,然后发布者向订阅者发送令牌。订阅者使用令牌请求消息,发送者根据请求消息的数量推送消息。订阅者可以随时异步追加需要的更多信息。

JDK9中在Flow接口中实现了Java API的4个接口,并提供了SubmissionPublisher

作为Publisher

接口的简单实现。

特别声明:以上内容(如有图片或视频亦包括在内)为自媒体平台“网易号”用户上传并发布,本平台仅提供信息存储服务。

Notice: The content above (including the pictures and videos if any) is uploaded and posted by a user of NetEase Hao, which is a social media platform and only provides information storage services.

相关推荐
热点推荐
毕福剑承认再婚且育有俩孩子!仍属央视老干部局,前妻加拿大生活

毕福剑承认再婚且育有俩孩子!仍属央视老干部局,前妻加拿大生活

花花lo先森
2024-11-26 09:19:13
国行Switch官宣停运后 各平台国行版仍然在售

国行Switch官宣停运后 各平台国行版仍然在售

游民星空
2024-11-26 18:27:19
香港警方抓获跨境卖淫团伙,3名日本AV女星“赴港外卖”被逮捕

香港警方抓获跨境卖淫团伙,3名日本AV女星“赴港外卖”被逮捕

这里是东京
2024-11-25 15:29:20
想让男孩情绪稳定,这9句话要频繁对他说

想让男孩情绪稳定,这9句话要频繁对他说

男孩派
2024-11-25 15:12:36
48小时内,俄罗斯遭大规模袭击,北约找到出兵借口,波兰要打头阵

48小时内,俄罗斯遭大规模袭击,北约找到出兵借口,波兰要打头阵

影孖看世界
2024-11-26 17:27:47
武汉楼市全军覆灭,武汉楼市汉阳区房价从1.7万跌至1.2万

武汉楼市全军覆灭,武汉楼市汉阳区房价从1.7万跌至1.2万

有事问彭叔
2024-11-26 16:39:59
凯特将参加卡塔尔国访接待,却没王冠戴,晚宴王冠造型指望索菲了

凯特将参加卡塔尔国访接待,却没王冠戴,晚宴王冠造型指望索菲了

DailyFlora
2024-11-26 16:46:38
刚“官宣”降价就火了!31天就卖出18585台,进口SUV才卖34万

刚“官宣”降价就火了!31天就卖出18585台,进口SUV才卖34万

沙雕小琳琳
2024-11-26 14:09:57
杨议直播间说的扒灰是怎么回事?德云社为何摘掉杨议的题字匾额?

杨议直播间说的扒灰是怎么回事?德云社为何摘掉杨议的题字匾额?

蜜桔娱乐
2024-11-26 15:18:58
正大量上市,一养肺止咳二润燥补血三养颜安神,冬天使劲吃身体棒

正大量上市,一养肺止咳二润燥补血三养颜安神,冬天使劲吃身体棒

斯佳丽的小厨房
2024-11-26 07:15:03
前美国情报官里特表示,拜登可能在2025年1月20日之前发动战争

前美国情报官里特表示,拜登可能在2025年1月20日之前发动战争

坠入二次元的海洋
2024-11-26 18:31:13
苏州一学校被曝学生宿舍环境脏乱差食堂有发霉鸡蛋?校方通报

苏州一学校被曝学生宿舍环境脏乱差食堂有发霉鸡蛋?校方通报

界面新闻
2024-11-26 14:18:40
俄国防部:苏-34战机使用滑翔炸弹摧毁了乌军在库尔斯克边境地区的军事装备和人员

俄国防部:苏-34战机使用滑翔炸弹摧毁了乌军在库尔斯克边境地区的军事装备和人员

俄罗斯卫星通讯社
2024-11-26 15:31:41
痛心!中国天才已留美任职,曾拒美国绿卡扬言回国效力,为何反悔

痛心!中国天才已留美任职,曾拒美国绿卡扬言回国效力,为何反悔

文史颜如玉
2024-11-12 08:00:16
市民晒官方回复文件200余字现7处错字,长沙县卫健局:工作失误,将重新回复

市民晒官方回复文件200余字现7处错字,长沙县卫健局:工作失误,将重新回复

上游新闻
2024-11-25 12:50:23
太突然!著名导演凌晨去世

太突然!著名导演凌晨去世

鲁中晨报
2024-11-24 09:24:08
《再见爱人》疑似离职剪辑师爆料,麦琳和黄圣依在节目中干架了?

《再见爱人》疑似离职剪辑师爆料,麦琳和黄圣依在节目中干架了?

奴染
2024-11-26 19:31:00
悲催!网传合肥网约车平台爆雷了,一直无法提现,办公室人去楼空

悲催!网传合肥网约车平台爆雷了,一直无法提现,办公室人去楼空

火山诗话
2024-11-26 13:19:23
王金兰举报刀郎剽窃后续!刀郎报警,更多内幕曝光:女方出卖刀郎

王金兰举报刀郎剽窃后续!刀郎报警,更多内幕曝光:女方出卖刀郎

拾娱先生
2024-11-25 23:07:29
老人五百万出售家传宝刀,专家失望指责,老人:你有啥资格

老人五百万出售家传宝刀,专家失望指责,老人:你有啥资格

眉宇奇侃
2024-11-25 07:10:32
2024-11-26 20:59:00
蜜糖的代码注释
蜜糖的代码注释
记录技术成长。
32文章数 29关注度
往期回顾 全部

科技要闻

"这是国产化最高,也是史上最强的Mate"

头条要闻

记者询问中方是否已与特朗普过渡团队接触 外交部回应

头条要闻

记者询问中方是否已与特朗普过渡团队接触 外交部回应

体育要闻

37岁,他用“半条右腿”重返巅峰

娱乐要闻

权威奖项沦为资本工具?谁来管一管

财经要闻

洪灏刘煜辉对谈实录 涉及A股、债务等!

汽车要闻

解决油车无法处理的难题 仰望U7数字底盘这么强

态度原创

旅游
时尚
艺术
游戏
军事航空

旅游要闻

晓华带火一座城,用心对文旅部门有多重要?

全员E人,占领都市!

艺术要闻

故宫珍藏的墨迹《十七帖》,比拓本更精良,这才是地道的魏晋写法

底气十足?腾讯在射击与开放世界赛道均手握王炸

军事要闻

新中导“榛树”亮相 俄乌都面临难题

无障碍浏览 进入关怀版