前言
這周的主要時間花在Flink上面,做了一個簡單的從文本檔案中讀取資料,然后存入資料庫的例子,能夠正常的實作功能,但是遇到個問題,我有四臺機器,自己搭建了一個standalone的集群,不論我把并行度設定多少,跑起來的耗時都非常接近,實在是百思不得其解,機器多似乎并不能幫助它, 把程序記錄在此,看后面隨著學習的深入能不能解答出這個問題,
嘗試過的修復方法
集群搭建
出現這個問題后,我從集群的角度來進行了些修改,
1,機器是2核的,slots被設定成了6,那我就有點懷疑是這個設定問題,因為其實只有2核,設定的多了,反而存在搶占資源,導致運行達不到效果,改成2后效果一樣,沒有改進,這個引數在
taskmanager.numberOfTaskSlots: 2
2,調整記憶體, taskmanager 從2G調整為4G, 效果也沒有變化,
taskmanager.memory.process.size: 4000m
這里說下這個記憶體,我們設定的是總的Memory,也就是這個Total Process Memory,
剔除掉些比較固定的Memory,剩下的大頭就是這個Task Heap 和 Managed Memory,
所以我們調整大小后,它兩個也就相應的增加了, 我查了下這兩個,可以理解為堆記憶體和堆外記憶體,
一個是存放我們程式的物件,會被垃圾回收器回收;一個是堆外記憶體,比如RockDB 和 快取 sort,hash 等的中間結果,
程式方面修改
最開始的時候我把保存資料庫操作寫在MapFunction里面,后來改到SinkFunction里面,
SinkFunction里面保存資料庫的方法也進行了反復修改,從開始使用Spring的JdbcTemplate,換成后來直接使用最原始JDBC, 而且還踩了一個坑,開始的時候用的注入的JdbcTemplate, 本地運行沒有問題,到了集群上面,發到別的機器的時候,注入的東西就是空的了,
換成原始的JDBC速度能提升不少, 我猜想這里的原因是jdbctemplate做了些多余的事情, JDBC打開一次,后面Invoke的時候就直接存了,效率要高些,所以速度上提升不少,
這里把部分代碼貼出來, 在Open的時候就預加載好PreparedStatement, Invoke的時候直接傳引數,呼叫就可以了,
public class SinkToMySQL2 extends RichSinkFunction<MarketPrice> {
private PreparedStatement updatePS;
private PreparedStatement insertPS;
private Connection connection;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
HikariDataSource dataSource = new HikariDataSource();
connection = getConnection(dataSource);
if(connection != null)
{
String updateSQL = " update MarketPrice set open_price=?,high_price=?,low_price=?,close_price=? where performance_id = ? and price_as_of_date = ?";
updatePS = this.connection.prepareStatement(updateSQL);
String insertSQL = " insert into MarketPrice(performance_id,price_as_of_date,open_price,high_price,low_price,close_price) values (?,?,?,?,?,?)";
insertPS = this.connection.prepareStatement(insertSQL);
}
}
@Override
public void close() throws Exception {
super.close();
if (updatePS != null) {
updatePS.close();
}
if (insertPS != null) {
insertPS.close();
}
//關閉連接和釋放資源
if (connection != null) {
connection.close();
}
}
/**
* 每條資料的插入都要呼叫一次 invoke() 方法
*
* @param marketPrice
* @param context
* @throws Exception
*/
@Override
public void invoke(MarketPrice marketPrice, Context context) throws Exception {
log.info("start save for {}", marketPrice.getPerformanceId().toString() );
updatePS.setDouble(1,marketPrice.getOpenPrice());
updatePS.setDouble(2,marketPrice.getHighPrice());
updatePS.setDouble(3,marketPrice.getLowPrice());
updatePS.setDouble(4,marketPrice.getClosePrice());
updatePS.setString(5, marketPrice.getPerformanceId().toString());
updatePS.setInt(6, marketPrice.getPriceAsOfDate());
int result = updatePS.executeUpdate();
log.info("finish update for {} result {}", marketPrice.getPerformanceId().toString(), result);
if(result == 0)
{
String insertSQL = " insert into MarketPrice(performance_id,price_as_of_date,open_price,high_price,low_price,close_price) values (?,?,?,?,?,?)";
insertPS = this.connection.prepareStatement(insertSQL);
insertPS.setString(1, marketPrice.getPerformanceId().toString());
insertPS.setInt(2, marketPrice.getPriceAsOfDate());
insertPS.setDouble(3,marketPrice.getOpenPrice());
insertPS.setDouble(4,marketPrice.getHighPrice());
insertPS.setDouble(5,marketPrice.getLowPrice());
insertPS.setDouble(6,marketPrice.getClosePrice());
result = insertPS.executeUpdate();
log.info("finish save for {} result {}", marketPrice.getPerformanceId().toString(), result);
}
}
}
總結
從多個方面去改進,結果發現還是一樣的,就是使用一臺機器和使用三臺機器,時間上一樣的,再懷疑我只能懷疑是某臺機器有問題,然后運行的時候,由最慢的機器決定了速度, 我在使用MapFunction的時候有觀察到,有的時候,某臺機器已經處理上千條,而有的只處理了幾十條,到最后完成的時候,大家處理的數量又是很接近的,這樣能夠解釋為什么機器多了,速度卻是一樣的,但是我沒有辦法找出哪臺機器來, 我自己的本地運行,并行數設定的多,速度上面是有提升的,到了集群就碰到這樣的現象,后面看能不能解決它, 先記錄在此,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/550196.html
標籤:其他