最近在跑flink社區(qū)1.15版本使用json_value函數(shù)時,發(fā)現(xiàn)其性能很差,通過jstack查看堆棧經(jīng)常在執(zhí)行以下堆棧
專注于為中小企業(yè)提供成都網(wǎng)站設(shè)計、成都網(wǎng)站制作服務(wù),電腦端+手機端+微信端的三站合一,更高效的管理,為中小企業(yè)霍山免費做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動了上1000家企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網(wǎng)站建設(shè)實現(xiàn)規(guī)模擴充和轉(zhuǎn)變。可以看到這里的邏輯是在等鎖,查看jsonpath的LRUCache
// | |
// Source code recreated from a .class file by IntelliJ IDEA | |
// (powered by FernFlower decompiler) | |
// | |
package org.apache.flink.table.shaded.com.jayway.jsonpath.spi.cache; | |
import java.util.Deque; | |
import java.util.LinkedList; | |
import java.util.Map; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.locks.ReentrantLock; | |
import org.apache.flink.table.shaded.com.jayway.jsonpath.JsonPath; | |
public class LRUCache implements Cache { | |
private final ReentrantLock lock = new ReentrantLock(); | |
private final Map | |
private final Deque | |
private final int limit; | |
public LRUCache(int limit) { | |
this.limit = limit; | |
} | |
public void put(String key, JsonPath value) { | |
JsonPath oldValue = (JsonPath)this.map.put(key, value); | |
if (oldValue != null) { | |
this.removeThenAddKey(key); | |
} else { | |
this.addKey(key); | |
} | |
if (this.map.size() >this.limit) { | |
this.map.remove(this.removeLast()); | |
} | |
} | |
public JsonPath get(String key) { | |
JsonPath jsonPath = (JsonPath)this.map.get(key); | |
if (jsonPath != null) { | |
this.removeThenAddKey(key); | |
} | |
return jsonPath; | |
} | |
private void addKey(String key) { | |
this.lock.lock(); | |
try { | |
this.queue.addFirst(key); | |
} finally { | |
this.lock.unlock(); | |
} | |
} | |
private String removeLast() { | |
this.lock.lock(); | |
String var2; | |
try { | |
String removedKey = (String)this.queue.removeLast(); | |
var2 = removedKey; | |
} finally { | |
this.lock.unlock(); | |
} | |
return var2; | |
} | |
private void removeThenAddKey(String key) { | |
this.lock.lock(); | |
try { | |
this.queue.removeFirstOccurrence(key); | |
this.queue.addFirst(key); | |
} finally { | |
this.lock.unlock(); | |
} | |
} | |
private void removeFirstOccurrence(String key) { | |
this.lock.lock(); | |
try { | |
this.queue.removeFirstOccurrence(key); | |
} finally { | |
this.lock.unlock(); | |
} | |
} | |
... | |
} | |
可以看到get操作時,如果獲取到的是有值的,那么會更新相應(yīng)key的數(shù)據(jù)從雙端隊列移到首位,借此來實現(xiàn)LRU的功能,但是這樣每次get和put操作都是需要加鎖的,因此并發(fā)情況下吞吐就會比較低,也會導(dǎo)致cpu使用效率較低。
從jsonpath社區(qū)查看相應(yīng)的問題,也有相關(guān)的反饋
Performance bottlenecks · Issue #740 · json-path/JsonPath · GitHub
Fix JSONPath cache inefficient issue by Ferrari6 · Pull Request #7409 · apache/pinot · GitHub
比較方便的是,jsonpath 提供了spi的方式可以自定義的設(shè)置Cache的實現(xiàn)類,可以通過以下方式來設(shè)置新的cache實現(xiàn)。
static { | |
CacheProvider.setCache(new JsonPathCache()); | |
} |
從pinot的實現(xiàn)中,我們看到他是用了guava的cache來替換了默認(rèn)的LRUCache實現(xiàn),那么這樣實現(xiàn)性能優(yōu)化有多少呢,這里我們是用java的性能測試框架jmh來測試下性能提升的情況
性能測試這里為了方便,直接在flink-benchmark工程里添加了兩個benchmark的測試類.
GuavaCache
LRUCache
這里面需要注意,因為cache是進(jìn)程級別共享的,所以我們需要將設(shè)置@State(Benchmark)
級別,這樣我們構(gòu)建的cache就是進(jìn)程級別共享,而不是線程級別共享的。
寫的測試是4個線程運行,緩存大小均為400
為了避免在本機運行時受本機的其他程序影響,最好是build jar之后放到服務(wù)器上跑
java -jar target/benchmarks.jar -rf csv org.apache.flink.benchmark.GuavaCacheBenchmark |
得到一個測試結(jié)果
Benchmark Mode Cnt Score Error Units | |
GuavaCacheBenchmark.get thrpt 30 4480.563 ± 203.311 ops/ms | |
GuavaCacheBenchmark.put thrpt 30 1774.769 ± 119.198 ops/ms | |
LRUCacheBenchmark.get thrpt 30 441.239 ± 2.812 ops/ms | |
LRUCacheBenchmark.put thrpt 30 350.549 ± 12.285 ops/ms |
可以看到使用guava的cache后,get性能提升8倍左右,put性能提升5倍左右。
這塊性能提升的主要來源是cache的實現(xiàn)機制上,和caffeine 的作者在github上也簡單了解了下相關(guān)的推薦實現(xiàn)
后面會寫一篇文章來專門分析下caffeine cache的優(yōu)化實現(xiàn)。
你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機房具備T級流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級服務(wù)器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧