阿里二面:Java8的Stream api是迭代一次還是迭代多次
面試官:java8新增的stream api用過嗎?
我:這個(gè)必須用過啊。
面試官:給你下面一個(gè)字符串?dāng)?shù)組,如果用stream api來實(shí)現(xiàn),找出以字符'a'開頭長(zhǎng)度最大的字符串,使用stream api該怎么實(shí)現(xiàn)呢?
- {"abb","abcd","fegc","efe","adfes"}
我:用下面這個(gè)方法來實(shí)現(xiàn):
- public static void maxLength(List<String> list){
- System.out.println(list.stream().filter(s -> s.startsWith("a")).mapToInt(r -> length(r)).max().orElse(0));;
- }
面試官:這個(gè)操作是迭代一次還是迭代兩次呢?也就是說是先迭代一遍,過濾出以字符'a'開頭的字符串?dāng)?shù)組,然后再迭代一次,找出最大長(zhǎng)度,還是一次迭代完成呢?
我:這個(gè)是迭代一次完成,如果要是迭代多次,stream后面的操作函數(shù)很多的情況下效率會(huì)非常低。我們加個(gè)打印可以來驗(yàn)證結(jié)果,代碼如下:
- public static void main(String[] args) {
- List<String> list = Arrays.asList("abb", "abcd", "fegc", "efe", "adfes");
- int maxLength = list.stream().
- filter(s -> isStartWitha(s)).
- mapToInt(StreamTest1::length).
- max().orElse(0);
- System.out.println("以字符a開頭的字符串最大長(zhǎng)度:" + maxLength);
- }
- private static boolean isStartWitha(String a){
- System.out.println(a + " is start with a:" + a.startsWith("a"));
- return a.startsWith("a");
- }
- private static int length(String a){
- System.out.println("the length of" + a + ":" + a.length());
- return a.length();
- }
打印結(jié)果如下:
- abb is start with a:true
- the length of abb:3
- abcd is start with a:true
- the length of abcd:4
- fegc is start with a:false
- efe is start with a:false
- adfes is start with a:true
- the length of adfes:5
- 以字符a開頭的字符串最大長(zhǎng)度:5
面試官:你確定只是迭代一次嗎?有其他情況嗎?
我:有。filter是一個(gè)無狀態(tài)的中間操作,對(duì)于這個(gè)中間操作來說,stream處理只需要迭代一次。但是對(duì)于有狀態(tài)的中間操作,就需要迭代多次。
面試官:你剛剛提到有狀態(tài)的操作和無狀態(tài)的操作,這個(gè)是怎么區(qū)分呢?
我:在stream api中,無狀態(tài)的操作是指當(dāng)前元素的操作不受前面元素的影響,主要包括如下方法:
- filter(),flatMap(),flatMapToInt(),flatMapToLong(),flatMapToDouble(),map(),mapToInt(),mapToDouble(),mapToLong(),peek(),unordered()
而有狀態(tài)的操作是指需要等所有元素處理完之后才能執(zhí)行當(dāng)前操作,主要包括下面方法:
- distinct(),limit(),skip(),sorted(),sorted()
面試官:有狀態(tài)的操作,能舉個(gè)例子嗎?
我:比如下面這段代碼:
- public static void main(String[] args) {
- List<Integer> list = Arrays.asList(5, 2, 3, 1, 4);
- List<Integer> newArray = list.stream()
- .map(StreamTest2::map1)
- .sorted((o1, o2) -> o1 - o2)
- .map(StreamTest2::map2)
- .collect(Collectors.toList());
- System.out.println("新的有序數(shù)組:" + newArray);
- }
- private static Integer map1(Integer i) {
- int result = i * 10;
- System.out.println("線程:" + Thread.currentThread().getName() + " 方法map1入?yún)ⅲ?quot; + i + ",輸出:" + result);
- return result;
- }
- private static Integer map2(Integer i) {
- int result = i * 10;
- System.out.println("線程:" + Thread.currentThread().getName() + " 方法map2入?yún)ⅲ?quot; + i + ",輸出:" + result);
- return result;
- }
上面代碼中,對(duì)原始數(shù)組進(jìn)行了兩次迭代,第一次迭代對(duì)所有數(shù)組元素都調(diào)用了map1方法乘以10,然后對(duì)新數(shù)組進(jìn)行排序,第二次迭代對(duì)排序后的數(shù)組元素調(diào)用map2方法,即對(duì)排序后的數(shù)組元素乘以10。方法輸出如下:
- 線程:main 方法map1入?yún)ⅲ?,輸出:50
- 線程:main 方法map1入?yún)ⅲ?,輸出:20
- 線程:main 方法map1入?yún)ⅲ?,輸出:30
- 線程:main 方法map1入?yún)ⅲ?,輸出:10
- 線程:main 方法map1入?yún)ⅲ?,輸出:40
- 線程:main 方法map2入?yún)ⅲ?0,輸出:100
- 線程:main 方法map2入?yún)ⅲ?0,輸出:200
- 線程:main 方法map2入?yún)ⅲ?0,輸出:300
- 線程:main 方法map2入?yún)ⅲ?0,輸出:400
- 線程:main 方法map2入?yún)ⅲ?0,輸出:500
- 新的有序數(shù)組:[100, 200, 300, 400, 500]
面試官:了解過底層原理嗎?
我:我來先畫一下Stream的UML類圖:
這個(gè)類圖說明以下幾點(diǎn):
- AbstractPipeline有基本類型的子類,如LongPipeline和DoublePipeline,還有一個(gè)引用類型的子類ReferencePipeline。
- 無論是ReferencePipeline,還是LongPipeline和DoublePipeline等基本類型的Pipeline,都有3個(gè)內(nèi)部類來繼承自己。
- StatelessOp對(duì)應(yīng)無狀態(tài)的操作,StatefulOp對(duì)應(yīng)有狀態(tài)的操作,Head對(duì)應(yīng)Collection.stream()方法返回結(jié)果。
- 無論是StatelessOp、StatefulOp還是Head,都是一個(gè)Pipeline,這些Pipeline用雙向鏈表串聯(lián)起來,每個(gè)Pipeline節(jié)點(diǎn)被看作一個(gè)Stage,Head是鏈表的頭結(jié)點(diǎn)。上面UML類圖中AbstractPipeline類中previousStage和nextStage就代表雙向鏈表當(dāng)前節(jié)點(diǎn)指向前后節(jié)點(diǎn)的引用。如下圖:
面試官:上面用雙向鏈表把所有操作都串聯(lián)起來了,這樣可以實(shí)現(xiàn)從Head節(jié)點(diǎn)開始依次執(zhí)行所有的操作。但是這些操作怎么疊加在一起呢?比如下面這段代碼有三個(gè)map方法,后面的方法要依賴前面的計(jì)算結(jié)果:
- List<Integer> list = Arrays.asList(5, 2, 3, 1, 4);
- List<Integer> newArray = list.stream().map(StreamTest2::map1).map(StreamTest2::map2).map(StreamTest2::map3).collect(Collectors.toList());
我:Stream提供了Sink接口來處理操作的疊加。上面代碼的map方法把操作封裝到了Sink,每個(gè)節(jié)點(diǎn)執(zhí)行操作時(shí),調(diào)用Sink的accept方法就可以把操作結(jié)果傳給下一個(gè)節(jié)點(diǎn)的Sink。比如map方法源代碼如下:
- public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
- Objects.requireNonNull(mapper);
- return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
- StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
- @Override
- //返回包裝成的Sink
- Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
- return new Sink.ChainedReference<P_OUT, R>(sink) {
- @Override
- public void accept(P_OUT u) {
- //downstream是下游節(jié)點(diǎn)的Sink,把當(dāng)前節(jié)點(diǎn)的執(zhí)行結(jié)果傳給下游節(jié)點(diǎn)
- downstream.accept(mapper.apply(u));
- }
- };
- }
- };
- }
面試官:能詳細(xì)講一下Sink嗎?
我:Sink主要提供了下面4個(gè)方法
- //執(zhí)行操作之前調(diào)用這個(gè)方法
- void begin(long size)
- //執(zhí)行操作之后調(diào)用這個(gè)方法
- void end()
- //是否可以結(jié)束操作
- boolean cancellationRequested()
- //操作執(zhí)行函數(shù)
- void accept()
對(duì)于有狀態(tài)的操作,必須實(shí)現(xiàn)begin和end兩個(gè)方法,因?yàn)閎egin方法會(huì)創(chuàng)建一個(gè)存放中間結(jié)果的容器,accept方法將元素放入該容器,end方法負(fù)責(zé)對(duì)容器中元素處理,比如排序。
面試官:那cancellationRequested方法什么時(shí)候用呢?
我:這個(gè)方法用于短路操作,比如stream.findAny。
面試官:你剛剛提到短路操作,怎么區(qū)分短路操作和非短路操作呢?
我:短路操作和非短路操作都是Stream的結(jié)束操作,結(jié)束操作是針對(duì)中間操作來說的。短路操作是指不用處理全部元素就可以結(jié)束,包括下面的方法:
- anyMatch(),allMatch(),noneMatch(),findFirst(),findAny()
非短路操作是指需要處理所有元素才能結(jié)束,包括下面的方法:
- forEach(),forEachOrdered(),toArray(),reduce(),collect(),max(),min(),count()
總結(jié)一下Stream操作,如下圖:
在遇到結(jié)束操作時(shí),所有Pipeline節(jié)點(diǎn)封裝的Sink會(huì)串成一個(gè)鏈表,如下圖:
把Sink串成鏈表的過程可以參考下面這段源代碼:
- final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
- Objects.requireNonNull(sink);
- for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
- sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
- }
- return (Sink<P_IN>) sink;
- }
這樣從Head節(jié)點(diǎn)開始依次調(diào)用每個(gè)節(jié)點(diǎn)封裝的Sink中的begin,accept,cancellationRequested,end 四個(gè)方法就可以完成Steam流水線的執(zhí)行。
面試官:上面提到了Sink會(huì)串成一個(gè)鏈,那對(duì)于有返回結(jié)果的操作,返回的結(jié)果是保存在什么地方呢?
我:這里分三種情況:
- 如果返回結(jié)果是boolean(比如 anyMatch、allMatch、noneMatch)和Optional(比如 findFirst、findAny),返回結(jié)果存放在對(duì)應(yīng)的Sink。
- collect, reduce等規(guī)約操作,返回結(jié)果存放在用戶指定的容器中,比如如下代碼返回結(jié)果放在Optional容器中:
- Optional accResult = Stream.of(1, 2, 3, 4, 5).reduce((sum, item) -> {
- sum += item;
- return sum;
- });
max 和 min也是規(guī)約操作,因?yàn)榈讓邮峭ㄟ^調(diào)用 reduce 方法實(shí)現(xiàn)的。
- 對(duì)于返回是數(shù)組的情況,返回?cái)?shù)組之前,數(shù)據(jù)會(huì)存放在一種多叉樹數(shù)據(jù)結(jié)構(gòu)中,這種多叉樹結(jié)構(gòu)元素存儲(chǔ)在樹的葉子當(dāng)中,一個(gè)葉子節(jié)點(diǎn)可以存放多個(gè)元素。
面試官:上面你提到返回?cái)?shù)組的時(shí)候用到了多叉樹的結(jié)構(gòu),這樣做對(duì)于Stream處理有什么好處呢?
我:按照官方的說法,這樣做是為了避免在并行操作期間不必要地復(fù)制數(shù)據(jù)。
面試官:能簡(jiǎn)單介紹一下Stream的并行處理嗎?
我:Stream的并行處理用到了Fork/Join框架,如下圖:
計(jì)算過程中,先把任務(wù)拆解成子任務(wù),并行計(jì)算。計(jì)算完成后再把子任務(wù)計(jì)算結(jié)果合并成結(jié)果集。
面試官:Fork/Join框架跟普通線程池相比,有什么優(yōu)勢(shì)嗎?
我:fork/join框架的優(yōu)勢(shì)是, 如果某個(gè)子任務(wù)需要等待另外一個(gè)子任務(wù)完成才能繼續(xù)工作,那么處理線程會(huì)主動(dòng)尋找其他未完成的子任務(wù)進(jìn)行執(zhí)行。跟普通線程池相比,減少了等待時(shí)間。
面試官:使用Stream并行流,一定會(huì)比串行快嗎?
我:這個(gè)不一定,使用的時(shí)候要考慮以下幾個(gè)因素:
- 要處理的元素?cái)?shù)量,數(shù)據(jù)越多,性能提升越明顯。
- 數(shù)據(jù)結(jié)構(gòu)的可分割性,數(shù)組、ArrayList支持隨機(jī)讀取,可分割性好,HashSet、TreeSet雖然可以分割,但不太容易分割均勻,LinkedList、Streams.iterate、BufferedReader.lines因?yàn)殚L(zhǎng)度未知,可分解性差。
- 盡量使用基本類型,避免裝箱拆箱。
- 單個(gè)子任務(wù)花費(fèi)時(shí)間越長(zhǎng),帶來的性能提升就會(huì)越大。
面試官:據(jù)說Stream api跟普通迭代相比有性能損耗,你怎么看?
我:對(duì)于簡(jiǎn)單的處理操作,Stream api性能確實(shí)不如普通迭代。但是如果CPU性能好的話,使用Stream并行處理性能會(huì)明細(xì)提高。對(duì)于復(fù)雜處理操作,無論并行還是串行,Stream api有明顯的優(yōu)勢(shì)。
對(duì)于并行處理,要考慮CPU的核數(shù)。