如果可以,购买《深入理解 kotlin 协程》支持作者 bennyhuo 霍丙乾!!!
异步和同步 异步和同步主要说的是代码执行的逻辑顺序
异步的概念 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 fun main () { print("A" ) print("B" ) print("C" ) print("D" ) print("A" ) thread { print("C" ) print("D" ) } print("B" ) }
对于同步代码,输出的肯定是ABCD
而对于异步代码,最早输出的是A
,但是B
会在CD
之前、之间还是之后,可就不一定了
如图:
异步设计的关键问题 结果传递 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 fun asyncBitmap ( url: String , callback: (Bitmap ) -> Unit ) : Bitmap? { return when (val bitmap = Cache.get (url)) { null -> { thread { download(url) .also { Cache.put(url, it) } .also(callback) } null } else -> bitmap } }
当Cache
有bitmap
的时候,就直接返回,否则就起一个线程来下载,并返回null
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 fun main () { callAsync { println("End 0" ) callAsync { println("End 1" ) } } } fun callAsync (callback: (Bitmap ) -> Unit ) { val bitmap = asyncBitmap( "https://www.bennyhuo.com/assets/avatar.jpg" ) { bitmap -> println("Async: $bitmap " ) callback(bitmap) } println("Main $bitmap " ) if (bitmap != null ) { callback(bitmap) } }
这里异步调用略微有点多,我们一点点看
首先我们先不看第二个callAsync
,只假设main
函数里callAsync
只调用了println("End 0")
,并没有调用第二个callAsync
,这样会容易一些
我们先看callAsync
的定义,里面只有 3 个操作
调用asyncBitmap
去获取bitmap
打印Main $bitmap
在bitmap
非空的时候,调用callback
这个 callback 就是println("End 0")
然后再看asyncBitmap
,发现Cache
里没有bitmap
,就会起一个线程下载图片并返回null
在下载成功的时候调用callback
,这个callback
就是比println("End 0")
多打印了一个println("Async: $bitmap")
1 2 println("Async: $bitmap " ) println("End 0" )
所以应该很清楚了
它去缓存里看是否有bitmap
,如果有就返回,然后在同步代码
调用println("End 0")
如果缓存没有bitmap
,就下载,在异步代码
里调用println("End 0")
所以这个println("End 0")
是在获取到bitmap
之后,再调用的
然后我们再回过头来看我们忽略的第二个callAsync
,是不是就是在上一段callAsync
之后,再进行一次callAsync
啊!
而这次我们Cache
里面就有bitmap
了,就不用再下载了,所以直接返回就行了
根据我们的猜测他的输出结果应该是什么呢?
1 2 3 4 5 Main null Async: [B@4bd36c21 End 0 Main [B@4bd36c21 End 1
前三行就是第一个callAsync
后两行就是第二个callAsync
当然,通常我们不会如此设计回调API,因为这样反而让程序写起 来更复杂了。更为常见的做法是,在结果就绪的情况下仍然立即以回 调的形式传递给调用方,以保证结果传递方式的透明性。
Kotlin
协程的挂起函数(suspend function
)本质上就采取 了这个异步返回值的设计思路
异常处理 我们改写一下asyncBitmap
,去掉返回值,加上异常处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 fun asyncBitmap ( url: String, onSuccess: (Bitmap) -> Unit, onError: (Throwable) -> Unit ) { thread { try { download (url).also (onSuccess) } catch (e: Exception) { onError (e) } } } fun callAsync2 () { try { val url = "https://www.bennyhuo.com/assets/avatar.jpg" checkUrl (url) asyncBitmap (url, onSuccess = ::show, onError = ::showError) } catch (e: Exception) { showError (e) } }
可以看到,我们不仅仅对download
进行了异常捕获,而对于url
也进行了异常捕获(毕竟一个不合法的url
你怎么都没法处理,只能抛异常了)
通常情况下,我们需要在同步代码和异步代码都进行异常捕获,为什么呢?
你们看callAsync2
,如果只在这个函数里进行异常捕获,那么还能捕获到download
的异常吗?
很显然不能,因为它起了另一个线程,已经不是同一个栈了
而协程他就是想解决将异步逻辑同步化,让异步的异常处理可以向同步处理一样
就像这样:
取消响应 稍微了解过Java
多线程的肯定都知道,取消或者中断一个线程并不是一件容易的事
现在往往是调用interrupt()
函数,然后等待线程判断到interrupted()
,再自我执行中断逻辑
也就是需要被中断的线程配合才能中断,但线程如果不配合,你也没办法,这时的异步任务颇有断线风筝的意思,能否回来只能看风筝自己的“心情”了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 fun asyncBitmapCancellable ( url: String , onSuccess: (Bitmap ) -> Unit , onError: (Throwable ) -> Unit ) = thread { try { downloadCancellable(url).also(onSuccess) } catch (e: Exception) { onError(e) } } fun downloadCancellable (url: String ) : Bitmap { return getAsStream(url).use { inputStream -> val bos = ByteArrayOutputStream() val buffer = ByteArray(1024 ) while (true ) { val length = inputStream.read(buffer, 0 , 1024 ) ...... if (Thread.interrupted()) throw InterruptedException("Task is cancelled." ) } bos.toByteArray() } }
JDK
最初提供了停止线程的API
,但它很快就被废弃 了,因为强行停止一个线程会导致该线程中持有的资源无法正常释 放,进而出现不安全的程序状态。
添加分支 我们可以为同步的逻辑添加分支甚至循环操作,这非常容易,但对于异步的逻辑而言,想要做到这一点就相对困难了。
当然这里同步逻辑的分支肯定也是同步的,异步逻辑的分支肯定也是异步的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 val urls = arrayOf( "https://www.bennyhuo.com/assets/avatar.jpg" , "https://www.bennyhuo.com/assets/avatar.jpg" , "https://www.bennyhuo.com/assets/avatar.jpg" ) fun loopOnAsyncCalls () { val countDownLatch = CountDownLatch(urls.size) val map = urls.associateWithTo(ConcurrentHashMap<String, Bitmap>()) { EMPTY_BITMAP } urls.map { url -> asyncBitmap(url, onSuccess = { map[url] = it countDownLatch.countDown() }, onError = { showError(it) countDownLatch.countDown() }) } countDownLatch.await() val bitmaps = map.values } fun asyncBitmap ( url: String , onSuccess: (Bitmap ) -> Unit , onError: (Throwable ) -> Unit ) { thread { try { download(url).also(onSuccess) } catch (e: Exception) { onError(e) } } }
countDownLatch
是一个计数器,我们这里让系统下载3个图片,就初始化为3
每下载完成或者下载失败就调用countDownLatch.countDown()
让数值减1
然后调用countDownLatch.await()
在这里阻塞,等待数值降到0
我们再写一个同步流程,同样下载三个图片
1 2 3 4 5 6 7 8 9 val urls = arrayOf( "https://www.bennyhuo.com/assets/avatar.jpg" , "https://www.bennyhuo.com/assets/avatar.jpg" , "https://www.bennyhuo.com/assets/avatar.jpg" ) fun loopOnSyncCalls () { val bitmaps = urls.map { download(url) } }
然后再测量这两次的耗时
1 2 3 4 5 6 7 8 9 10 fun main () { download("https://www.bennyhuo.com/assets/avatar.jpg" ) measureTimeMillis { loopOnAsyncCalls() }.let(::println) measureTimeMillis { loopOnSyncCalls() }.let(::println) }
download
是为了先握手,这样后续就可以直接下载,无需握手,让对比更公平
可以看到,确实有一些差距,异步分支确实可以节省一些时间,更充分地利用性能
想想,如果你需要在异步逻辑里写循环,你还希望回收这些个线程,还有可能中断线程,这可就复杂了,因为在循环里,异步逻辑有多少个,还不清楚。
常见异步程序的设计思路 Java Future 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 val urls = arrayOf( "https://www.bennyhuo.com/assets/avatar.jpg" , "https://www.bennyhuo.com/assets/avatar.jpg" , "https://www.bennyhuo.com/assets/avatar.jpg" ) fun bitmapFuture (url: String ) : Future<Bitmap> { return ioExecutor.submit(Callable { download(url) }) } val ioExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()) fun main () { val bitmaps = urls.map { bitmapFuture(it) }.map { it.get () } }
在这里,我们创建了一个线程池,通过submit
获取一个Future
kotlin
map
函数,翻译过来是映射函数,没错这个map
是个动词,就是映射
1 2 3 4 5 6 7 8 9 public inline fun <T, R> Array<out T> .map (transform: (T ) -> R ) : List<R> { return mapTo(ArrayList<R>(size), transform) } public inline fun <T, R, C : MutableCollection<in R> > Array<out T> .mapTo (destination: C , transform: (T ) -> R ) : C { for (item in this ) destination.add(transform(item)) return destination }
两个都是inline
,我直接第二个合并到第一个
1 2 3 4 5 6 public inline fun <T, R> Array<out T> .map (transform: (T ) -> R ) : List<R> { val arrayList = ArrayList<R>(size) for (item in this ) arrayList.add(transform(item)) return arrayList }
可以看到,返回值是List<R>
,仅仅只调用transform(item)
并添加到这个List
里面
第一个map
返回一个List<Future>
第二个map
返回一个List<Bitmap>
第一个map
肯定很快就返回了,但是第二个map
相当于在for
循环里调用get
函数,而get
是一个阻塞函数。所以第二个map
代码逻辑大概就是这样:
1 2 for (item in this ) arrayList.add(future.get ())
所以,单纯使用get
并不好用
我们往往会搭配isDone
来使用
比如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 fun main () { val bitmapLists = urls.map { bitmapFuture(it) } val bitmaps = bitmapLists.map { while (true ){ if (it.isDone()){ it.get () break } } } }
你看,Future
的get
的阻塞性质,导致我们写出的代码都不太像异步了
Java CompletableFuture JDK 1.8
又新增了一个CompletableFuture
类,它实现了Future
接口, 通过它我们可以拿到异步任务的结果,此外,它还有很多更好用的方 法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 fun bitmapCompletableFuture (url: String ) : CompletableFuture<Bitmap> = CompletableFuture.supplyAsync { download(url) } fun callCompletableFuture () { urls.map { bitmapCompletableFuture(it) }.let { futureList -> CompletableFuture.allOf(*futureList.toTypedArray()) .thenApply { futureList.map { it.get () } } }.thenAccept { bitmaps -> println(bitmaps.size) }.join() }
这里allof
是用于等待所有CompletableFuture
。
thenApply
用于在所有CompletableFuture
完成后,将结果映射为一个包含所有Bitmap
的列表
thenAccept
用于在所有图片都下载完成后执行指定的操作,这里只是输出图片的数量。
虽然CompletableFuture
解决了异步结果不阻塞主调用流程的问题,但却让结果的获取脱离了主调用流程。
就类似retrofit
默认回调那样
JavaScript Promise 与 async/await CompletableFuture
还实现了另一个接口——CompletionStage
, 前面我们用到的thenAccept
类似的方法也都是这个接口的API
。从定义 和功能来看,CompletionStage
是一个Promise
。
那么Promise
又是什么呢?按照 Promises/A+ ,Promise
是一个异步任务,它存在挂起、完成、拒绝三个状态,当它处在完成状态时,结果 通过调用then
方法的参数进行回调;出现异常拒绝时,通过catch
方法 传入的参数来捕获拒绝的原因。
从ECMAScript 6
开始,JavaScript
就已经支持Promise
了,我们先 来看之前的例子怎么用Promise
来实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 const urls = [ "https://www.bennyhuo.com/assets/avatar.jpg" , "https://www.bennyhuo.com/assets/avatar.jpg" , "https://www.bennyhuo.com/assets/avatar.jpg" ]; function main ( ) { Promise .all (urls.map (url => bitmapPromise (url))) .then (bitmaps => console .log (bitmaps.length )) .catch (e => console .error (e)); } function bitmapPromise (url ) { return new Promise ((resolve, reject ) => { try { download (url, resolve) } catch (e) { reject (e) } }) } function download (url, callback ) { https.get (url, (res ) => { let data = '' ; res.on ('data' , chunk => { data += chunk; }); res.on ('end' , () => { callback (data) }); } ); }
第8
行,Promise.all
会将多个Promise
整合到一起,这与我们前面为整合CompletableFuture
而定义的List<CompletableFuture<T>>.allOf
如出 一辙。
最终我们得到一个新的Promise
,它的结果是整合了前面所有bitmapPromise
函数返回的结果的bitmaps
,因此我们在then
当中传入 的Lambda
表达式就是用来处理消费这个bitmaps
的。
这样看起来很不错,达到了与CompletableFuture
同样的效果,不 过还可以更简洁。
我们可以通过async/await
将上面的代码进一步简化
1 2 3 4 5 6 7 8 async function asyncMain ( ) { try { const bitmaps = await Promise .all (urls.map (url => bitmapPromise (url))); console .log (bitmaps); } catch (e) { console .error (e); } }
给整个逻辑的外部函数加上了async
关键字,这样就可以在异 步调用返回Promise
的位置加上await
,这个语法糖可以把前面的then
和catch
调用转换成我们熟悉的同步调用语法。
这样就非常像样了,我们通过同步代码执行了异步流程
响应式编程 写过rxJava
、liveData
、 观察者模式之类的代码坑你定对响应式编程比较熟悉
1 2 3 4 5 6 7 8 9 10 11 12 fun main () { Single.just("https://www.bennyhuo.com/assets/avatar.jpg" ) .map { download(it) } .subscribeOn(Schedulers.io()) .subscribe({ bitmap -> show(bitmap) }, { throwable -> showError(throwable) }) Thread.sleep(10000 ) }
Single
是RxJava
提供的一个像Promise
的API
不过,Observable
跟前面提到的Future
和Promise
有一个很大的不 同,它的逻辑执行取决于订阅,而不是立即执行。
此外,它还提供了 任意变换之间可以切换线程调度器的能力,这一能力让复杂的数据变 换和流转可以轻易实现异步。
当然,这也曾一度让它被滥用为线程切换的工具。
kotlin 协程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 suspend fun bitmapSuspendable (url: String ) : Bitmap = suspendCoroutine { continuation -> thread { try { continuation.resume(download(url)) } catch (e: Exception) { continuation.resumeWithException(e) } } } suspend fun main () { try { val bitmap = bitmapSuspendable("https://www.bennyhuo.com/assets/avatar.jpg" ) show(bitmap) } catch (e: Exception) { println("下载失败" ) e.printStackTrace() } println("hello world!" ) }
我们使用标准库APIsuspendCoroutine<T>
函数的返回值类型作为 挂起函数bitmap-Suspendable
的返回值类型,也就是泛型参数T的实参Bitmap
。
这个函数除了确定返回值类型外,还能够帮我们拿到一个Continuation
的实例,负责保存和恢复挂起状态,逻辑效果上类似于Promise
,其中几个函数意义如下。
-resume
: 类似于Promise
的resolve
,将正常的结果返回,它的参数实际上就是bitmapSuspendable
的返回值Bitmap
。
-resumeWithException
: 类似于Promise
的reject
,将异常返回,它的参数实际上就是bitmapSuspendable
调用时会抛出的异常。
看我们调用的时候,直接try-catch
现在我们关掉网络,跑一遍
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 下载失败 hello world! java.net.UnknownHostException: www.bennyhuo.com: nodename nor servname provided, or not known at java.base/java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) at java.base/java.net.InetAddress$PlatformNameService.lookupAllHostAddr(InetAddress.java:930) at java.base/java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1543) at java.base/java.net.InetAddress$NameServiceAddresses.get(InetAddress.java:848) at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1533) at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1386) at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1307) at okhttp3.Dns.lambda$static$0(Dns.java:39) at okhttp3.internal.connection.RouteSelector.resetNextInetSocketAddress(RouteSelector.java:171) at okhttp3.internal.connection.RouteSelector.nextProxy(RouteSelector.java:135) at okhttp3.internal.connection.RouteSelector.next(RouteSelector.java:84) at okhttp3.internal.connection.ExchangeFinder.findConnection(ExchangeFinder.java:187) at okhttp3.internal.connection.ExchangeFinder.findHealthyConnection(ExchangeFinder.java:108) at okhttp3.internal.connection.ExchangeFinder.find(ExchangeFinder.java:88) at okhttp3.internal.connection.Transmitter.newExchange(Transmitter.java:169) at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:41) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117) at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:94) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117) at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142) at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:88) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117) at common.api.ApiKt$httpClient$2.invoke$lambda$1(Api.kt:12) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117) at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:229) at okhttp3.RealCall.execute(RealCall.java:81) at ch01.Listing05_06Kt.getAsStream(Listing05_06.kt:61) at ch01.Listing05_06Kt.download(Listing05_06.kt:51) at ch01.Listing23_24Kt$bitmapSuspendable$2$1.invoke(Listing23_24.kt:12) at ch01.Listing23_24Kt$bitmapSuspendable$2$1.invoke(Listing23_24.kt:10) at kotlin.concurrent.ThreadsKt$thread$thread$1.run(Thread.kt:30)
这里看到,信息都清楚地打印出来了
总结
本质上,异步和同步这两个概念探讨的是程序的控制流程,异步的同时也经常伴随着并发,但这不是必然的。
-Kotlin
协程是用来简化异步程序设计的,可以在实现任务的异步调用的同时,降低代码的设计复杂度,进而提升代码可读性。