本篇主要介紹了一種使用Rust語言撰寫的查詢引擎——DataFusion,其使用了基于Arrow格式的記憶體模型,結合Rust語言本身的優勢,達成了非常優秀的性能指標
DataFusion是一個查詢引擎而非資料庫,因此其本身不具備存盤資料的能力,但正因為不依賴底層存盤的格式,使其成為了一個靈活可擴展的查詢引擎,它原生支持了查詢CSV,Parquet,Avro,Json等存盤格式,也支持了本地,AWS S3,Azure Blob Storage,Google Cloud Storage等多種資料源,同時還提供了豐富的擴展介面,可以方便的讓我們接入自定義的資料格式和資料源,
DataFusion具有以下特性:
- 高性能:基于Rust,不用進行垃圾回收;基于Arrow記憶體模型,列式存盤,方便向量化計算
- 連接簡單:能夠與Arrow的其他生態互通
- 集成和定制簡單:可以擴展資料源,方法和算子等
- 完全基于Rust撰寫:高質量
基于DataFusion我們可以輕松構建高性能、高質量、可擴展的資料處理系統,
DBMS 與 Query Engine 的區別
DBMS: DataBase Management System
DBMS是一個包含完整資料庫管理特性的系統,主要包含以下幾個模塊:
- 存盤系統
- 元資料(Catalog)
- 查詢引擎(Query Engine)
- 訪問控制和權限
- 資源管理
- 管理工具
- 客戶端
- 多節點管理
Query Engine
DataFusion是一種查詢引擎,查詢引擎屬于資料庫管理系統的一部分,查詢引擎是用戶與資料庫互動的主要介面,主要作用是將面向用戶的高階查詢陳述句翻譯成可被具體執行的資料處理單元操作,然后執行操作獲取資料,
DataFusion架構
架構詳情
DataFusion查詢引擎主要由以下幾部分構成:
- 前端
- 語法決議
- 語意分析
- Planner:語法樹轉換成邏輯計劃
主要涉及
DFParser
和SqlToRel
這兩個struct
- 查詢中間表示
- Expression(運算式)/ Type system(型別系統)
- Query Plan / Relational Operators(關系算子)
- Rewrites / Optimizations(邏輯計劃優化)
主要涉及
LogicalPlan
和Expr
這兩個列舉類
- 查詢底層表示
- Statistics(物理計劃算子的統計資訊,輔助物理計劃優化)
- Partitions(分塊,多執行緒執行物理計劃算子)
- Sort orders(物理計劃算子對資料是否排序)
- Algorithms(物理計劃算子的執行演算法,如Hash join和Merge join)
- Rewrites / Optimizations(物理計劃優化)
主要涉及
PyhsicalPlanner
這個trait
實作的邏輯計劃到物理計劃的轉換,其中主要的關鍵點是ExecutionPlan
和PhysicalExpr
- 執行運行時(算子)
- 分配資源
- 向量化計算
主要涉及所有執行算子,如
GroupedHashAggregateStream
擴展點
DataFusion查詢引擎的架構還是比較簡單的,其中的擴展點也非常清晰,我們可以從以下幾個方面對DataFusion進行擴展:
用戶自定義函式UDF
無狀態方法
/// 邏輯運算式列舉類
pub enum Expr {
...
ScalarUDF {
/// The function
fun: Arc<ScalarUDF>,
/// List of expressions to feed to the functions as arguments
args: Vec<Expr>,
},
...
}
/// UDF的邏輯運算式
pub struct ScalarUDF {
/// 方法名
pub name: String,
/// 方法簽名
pub signature: Signature,
/// 回傳值型別
pub return_type: ReturnTypeFunction,
/// 方法實作
pub fun: ScalarFunctionImplementation,
}
/// UDF的物理運算式
pub struct ScalarFunctionExpr {
fun: ScalarFunctionImplementation,
name: String,
/// 引數運算式串列
args: Vec<Arc<dyn PhysicalExpr>>,
return_type: DataType,
}
用戶自定義聚合函式UADF
有狀態方法
/// 邏輯運算式列舉類
pub enum Expr {
...
AggregateUDF {
/// The function
fun: Arc<AggregateUDF>,
/// List of expressions to feed to the functions as arguments
args: Vec<Expr>,
/// Optional filter applied prior to aggregating
filter: Option<Box<Expr>>,
},
...
}
/// UADF的邏輯運算式
pub struct AggregateUDF {
/// 方法名
pub name: String,
/// 方法簽名
pub signature: Signature,
/// 回傳值型別
pub return_type: ReturnTypeFunction,
/// 方法實作
pub accumulator: AccumulatorFunctionImplementation,
/// 需要保存的狀態的型別
pub state_type: StateTypeFunction,
}
/// UADF的物理運算式
pub struct AggregateFunctionExpr {
fun: AggregateUDF,
args: Vec<Arc<dyn PhysicalExpr>>,
data_type: DataType,
name: String,
}
用戶自定義優化規則
Optimizer
定義了承載優化規則的結構體,其中optimize
方法實作了邏輯計劃優化的程序,優化規則串列中的每個優化規則會被以TOP-DOWN
或BOTTOM-UP
方式作用于邏輯計劃樹,優化規則串列會被實施多個輪次,我們可以通過實作OptimizerRule
這個trait
來實作自己的優化邏輯,
pub struct Optimizer {
/// All rules to apply
pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
}
pub trait OptimizerRule {
/// Try and rewrite `plan` to an optimized form, returning None if the plan cannot be
/// optimized by this rule.
fn try_optimize(
&self,
plan: &LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>>;
...
}
用戶自定義邏輯計劃算子
/// 邏輯計劃算子列舉類
pub enum LogicalPlan {
...
Extension(Extension),
...
}
/// 自定義邏輯計劃算子
pub struct Extension {
/// The runtime extension operator
pub node: Arc<dyn UserDefinedLogicalNode>,
}
/// 自定義邏輯計劃算子需要實作的trait
pub trait UserDefinedLogicalNode: fmt::Debug + Send + Sync { ... }
用戶自定義物理計劃算子
/// 為自定義的邏輯計劃算子`UserDefinedLogcialNode`生成對應的物理計劃算子
pub trait ExtensionPlanner {
async fn plan_extension(
&self,
planner: &dyn PhysicalPlanner,
node: &dyn UserDefinedLogicalNode,
logical_inputs: &[&LogicalPlan],
physical_inputs: &[Arc<dyn ExecutionPlan>],
session_state: &SessionState,
) -> Result<Option<Arc<dyn ExecutionPlan>>>;
}
/// DataFusion默認的邏輯計劃到物理計劃的轉換器提供了自定義轉換程序的結構體
pub struct DefaultPhysicalPlanner {
extension_planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>,
}
/// 自定義物理計劃算子需要實作的trait
pub trait ExecutionPlan: Debug + Send + Sync { ... }
用戶自定義資料源
可以看出,自定義資料源其實就是生成一個對應的ExecutionPlan執行計劃,這個執行計劃實施的是掃表的任務,如果資料源支持下推的能力,我們在這里可以將projection
filters
limit
等操作下推到掃表時,
/// 自定義資料源需要實作的trait
pub trait TableProvider: Sync + Send {
...
async fn scan(
&self,
state: &SessionState,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>>;
...
}
用戶自定義元資料
pub trait CatalogProvider: Sync + Send {
...
/// 根據名稱獲取Schema
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>>;
/// 注冊Schema
fn register_schema(
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> Result<Option<Arc<dyn SchemaProvider>>> {
// use variables to avoid unused variable warnings
let _ = name;
let _ = schema;
Err(DataFusionError::NotImplemented(
"Registering new schemas is not supported".to_string(),
))
}
}
pub trait SchemaProvider: Sync + Send {
...
/// 根據表名獲取資料源
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>>;
/// 注冊資料源
fn register_table(
&self,
name: String,
table: Arc<dyn TableProvider>,
) -> Result<Option<Arc<dyn TableProvider>>> {
Err(DataFusionError::Execution(
"schema provider does not support registering tables".to_owned(),
))
}
...
}
邏輯計劃(LogicalPlan)
邏輯計劃其實就是資料流圖,資料從葉子節點流向根節點
let df: DataFrame = ctx.read_table("http_api_requests_total")?
.filter(col("path").eq(lit("/api/v2/write")))?
.aggregate([col("status")]), [count(lit(1))])?;
這里我們就使用DataFusion的API介面構造了一個資料流,首先read_table
節點會從資料源中掃描資料到記憶體中,然后經過filter
節點按照條件進行過濾,最后經過aggregate
節點進行聚合,資料流過最后的節點時,就生成了我們需要的資料,
上述鏈式呼叫的API介面實際上并沒有真正執行對資料的操作,這里實際上是使用了建造者模式構造了邏輯計劃樹,最終生成的DataFrame
實際上只是包含了一下資訊:
pub struct DataFrame {
/// 查詢背景關系資訊,包含了元資料,用戶注冊的UDF和UADF,使用的優化器,使用的planner等資訊
session_state: SessionState,
/// 邏輯計劃樹的根節點
plan: LogicalPlan,
}
支持的邏輯計劃算子
點擊查看代碼
Projection
Filter
Window
Aggregate
Sort
Join
TableScan
Repartition
Union
Subquery
Limit
Extension
Distinct
Values
Explain
Analyze
SetVariable
Prepare
Dml(...)
CreateExternalTable
CreateView
CreateCatalogSchema
CreateCatalog
DropTable
DropView
邏輯計劃優化
目標:確保結果相同的情況下,執行更快
初始的邏輯計劃,需要經過多個輪次的優化,才能生成執行效率更高的邏輯計劃,DataFusion本身的優化器內置了很多優化規則,用戶也可以擴展自己的優化規則,
內置優化輪次
-
下推(Pushdown):減少從一個節點到另一個節點的資料的行列數
PushDownProjection
PushDownFilter
PushDownLimit
-
簡化(Simplify):簡化運算式,減少運行時的運算,例如使用布爾代數的法則,將
b > 2 AND b > 2
簡化成b > 2
,SimplifyExpressions
UnwrapCastInComparison
-
簡化(Simplify):洗掉無用的節點
-
平鋪子查詢(Flatten Subqueries):將子查詢用join重寫
DecorrelateWhereExists
DecorrelatedWhereIn
ScalarSubqueryToJoin
-
優化join:識別join謂詞
ExtractEqualJoinPredicate
RewriteDisjunctivePredicate
FilterNullJoinKeys
-
優化distinct
SingleDistinctToGroupBy
ReplaceDistinctWithAggregate
運算式運算(Expression Evaluation)
假設現在有這樣一個謂詞運算式
path = '/api/v2/write' or path is null
經過語法決議和轉換后,可以用如下運算式樹表示:
DataFusion在實施運算式運算時,使用了Arrow提供的向量化計算方法來加速運算
物理計劃(ExecutionPlan)
呼叫DataFusion提供的DefaultPhysicalPlanner
中的create_physical_plan
方法,可以將邏輯計劃樹轉換成物理計劃樹,其中物理計劃樹中的每個節點都是一個ExecutionPlan
,執行物理計劃樹時,會從根節點開始呼叫execute
方法,呼叫該方法還沒有執行對資料的操作,僅僅是將每個物理計劃算子轉換成一個RecordBatchStream
算子,形成資料流算子樹,這些RecordBatchStream
算子都實作了future
包提供的Stream
特性,當我們最終呼叫RecordBatchStream
的collect
方法時,才會從根節點開始poll
一次來獲取一下輪要處理的資料,根節點的poll
方法內會呼叫子節點的poll
方法,最終每poll
一次,整棵樹都會進行一次資料從葉子節點到根節點的流動,生成一個RecordBatch
,
DataFusion實作的物理計劃算子具有以下特性:
- 異步:避免了阻塞I/O
- 流式:資料是流式處理的
- 向量化:每次可以向量化地處理一個
RecordBatch
- 分片:每個算子都可以并行,可以產生多個分片
- 多核
結語
DataFusion本身只是一個簡單,高效,可擴展的查詢引擎框架,用戶可以將DataFusion作為開發大型資料中臺的基礎組件,也可以輕易地將DataFusion嵌入服務中作為查詢引擎,也可以使用DataFusion構建自己的資料庫系統,如果期望使用分布式的查詢引擎,可以關注基于Arrow
和DataFusion
搭建的分布式查詢引擎Ballista,
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/552588.html
標籤:其他
上一篇:MySQL的varchar存盤原理:InnoDB記錄存盤結構
下一篇:返回列表