關于寫異步代碼測試用例的一些思考
如果說異步代碼不好寫是共識的話,那么寫異步代碼測試用例就更難了。最近我剛剛完成了一個 Flaky 測試,所以想和大家分享一些關于寫異步測試用例的想法。
這篇文章里,我們會探索一個關于異步測試用例的常見問題 —— 如何強制規(guī)定某些線程的順序,如何強制某一個線程操作早于另一些執(zhí)行。通常我們并不想強行規(guī)定線程之間的順序,因為這違背了多線程的原則,所謂多線程就是 為了做到并發(fā),從而使得 CPU 可以根據當前資源及應用狀態(tài)選擇最佳的執(zhí)行順序。但是在測試中,為了確保測試結果的穩(wěn)定性,又必須明確線程順序。
測試節(jié)流閥(Throttler)
在軟件業(yè)里節(jié)流閥指的是用于限制并發(fā)操作個數,預留資源的模式,好比連接池,網絡緩存,或者 CPU 密集型操作。和其他同步工具不同的是,節(jié)流閥的角色是啟動“快速失敗”機制,即促使超額請求立即失敗,而不是等待。“快速失敗”機制之所以重要,是因為切 換操作,等待操作會消耗資源 —— 端口,線程,內存等。
以下就是一個節(jié)流閥的簡單實現(基本上是信號量的包裝,實際應用中應該是等待,重試等等)
- class ThrottledException extends RuntimeException("Throttled!")
- class Throttler(count: Int) {
- private val semaphore = new Semaphore(count)
- def apply(f: => Unit): Unit = {
- if (!semaphore.tryAcquire()) throw new ThrottledException
- try {
- f
- } finally {
- semaphore.release()
- }
- }
- }
現在我們開始基本的單元測試:測試單線程的節(jié)流閥(我們使用測試框架 specs2)。本例里,我們會驗證順序調用是否會超過節(jié)流閥的最大限制(maxCount變量如下所示)。注意,這里我們用的是單線程,所以我們并不驗證節(jié)流閥的“快速失敗”功能,這里的節(jié)流閥都處于不飽和狀態(tài)。事實上,我們只會測試節(jié)流閥在不飽和狀態(tài)下不會終止操作。
- class ThrottlerTest extends Specification {
- "Throttler" should {
- "execute sequential" in new ctx {
- var invocationCount = 0
- for (i <- 0 to maxCount) {
- throttler {
- invocationCount += 1
- }
- }
- invocationCount must be_==(maxCount + 1)
- }
- }
- trait ctx {
- val maxCount = 3
- val throttler = new Throttler(maxCount)
- }
- }
測試并發(fā)節(jié)流閥
前一個例子里,節(jié)流閥處于不飽和狀態(tài),因為單線程里節(jié)流閥一般都不會飽和。下面我們來測試一下多線程環(huán)境下節(jié)流閥是否還能工作良好。
設置如下:
- val e = Executors.newCachedThreadPool()
- implicit val ec: ExecutionContext=ExecutionContext.fromExecutor(e)
- private val waitForeverLatch = new CountDownLatch(1)
- override def after: Any = {
- waitForeverLatch.countDown()
- e.shutdownNow()
- }
- def waitForever(): Unit = try {
- waitForeverLatch.await()
- } catch {
- case _: InterruptedException =>
- case ex: Throwable => throw ex
- }
ExecutionContext 用來構建 Future,waitForever 方法用來持有線程,直到測試結束前的鎖釋放。接下來的函數里,我們會關閉一個執(zhí)行服務。
以下就是一個測試節(jié)流器多線程行為的例子:
- "throw exception once reached the limit [naive,flaky]" in new ctx {
- for (i <- 1 to maxCount) {
- Future {
- throttler(waitForever())
- }
- }
- throttler {} must throwA[ThrottledException]
我們創(chuàng)建了 maxCount 個線程(調用 Future{})來調用 waitForever 函數,該函數會一直直到道測試結束。然后我們繞開節(jié)流閥執(zhí)行另一個操作 —— maxCount + 1。預期的行為是,此時應該拋出 ThrottledException 例外。但是,也許預期的例外并不發(fā)生,因為接力器的最后的一個調用可能會比 future 里的先執(zhí)行(future 里會拋出例外,但是這不是預期結果)。
上面這個測試的問題是,在像期望中那樣節(jié)流閥拋出異常然后導致節(jié)流閥被違反之前,我們無法確定所有的線程都已經開始并且在 waitForever 函數中被阻塞。為了修復這個問題,我們需要一些方法去等待所有 future 開始。這有一個我們大多數都很熟悉的一種方法:只要增加一個 sleep 函數等待一些合適的時間。
- "throw exception once reached the limit [naive, bad]" in new ctx {
- for (i <- 1 to maxCount) {
- Future {
- throttler(waitForever())
- }
- }
- Thread.sleep(1000)
- throttler {} must throwA[ThrottledException]
- }
好了,現在這個測試幾乎都能通過了,但是這個方法還是錯的因為下面這兩個原因:
測試持續(xù)的時間至少會和我們設置好的”合適的時間”差不多久。
在非常罕見的情況下,比如機器處于高負載的時候,這個合適的時間不一定足夠。
如果你仍然感到疑惑,可以搜索一下 Google 更多的原因。
一個更好的方式是將我們的線程(future)的開始和我們期望的東西同步起來。我們來使用 java.util.concurrent 里面的 CountDownLatch 類:
- "throw exception once reached the limit [working]" in new ctx {
- val barrier = new CountDownLatch(maxCount)
- for (i <- 1 to maxCount) {
- Future {
- throttler {
- barrier.countDown()
- waitForever()
- }
- }
- }
- barrier.await(5, TimeUnit.SECONDS) must beTrue
- throttler {} must throwA[ThrottledException]
- }
我們使用 CountDownLatch 處理障礙同步。 這個等待的方法會阻塞主線程直到鎖存計數變?yōu)?0。隨著其它線程的運行(我們把這些其它線程表示為 future),每一個 future 都會調用 countDown 方法使鎖存計數減 1。一但計數變?yōu)?0,所有的 future 就都已經運行到 waitForever 方法中了。
通過那一點,我們可以確保 throttler 是飽和的,內部有最大數量(maxCount)的線程。另一個線程試圖進入 throttler 將導致異常。我們有一個確定的方式建立我們的測試,測試會有一個主線程進入 throttler。主線程可以恢復到這個點(門閂計數為 0 并等 CountDownLatch 釋放等待線程)。
如果一些意想不到的事情發(fā)生,我們使用超時略高保障避免無限阻塞發(fā)生。如果這樣的事情發(fā)生,我們的測試就失敗了。這個超時不會影響到測試時間,除非發(fā)生意外情況,否則,我們都不應該等待。
結論
測試異步程序時,通常需要在具體的測試用例中指定多個線程之間的執(zhí)行順序。不使用任何同步策略的測試是不可靠的,測試結果有時成功有時失敗。使用 Thread.sleep 降低了測試出錯的概率,但沒有完全解決這個問題。
在大多數情況下,當需要在測試中保證多個線程的執(zhí)行順序時,可以使用 CountDownLatch 代替 Thead.sleep。使用 CountDownlatch 的好處是通過它可以指定釋放(保持)線程的時機,有兩個優(yōu)點:確保按順序執(zhí)行使測試結果更可靠;加快了測試程序的執(zhí)行速度。即使對于普通的 waiting 操作,比如 waitForever 函數,盡管也可以使用 Thread.sleep(Long.MAX_VALUE) 這樣的函數實現,但為了保證程序的健壯性最好不要這樣做。
完整的代碼可以在 GitHub 中找到。