主頁 > 資料庫 > Apache Arrow DataFusion原理與架構

Apache Arrow DataFusion原理與架構

2023-05-16 15:30:16 資料庫

本篇主要介紹了一種使用Rust語言撰寫的查詢引擎——DataFusion,其使用了基于Arrow格式的記憶體模型,結合Rust語言本身的優勢,達成了非常優秀的性能指標

DataFusion是一個查詢引擎而非資料庫,因此其本身不具備存盤資料的能力,但正因為不依賴底層存盤的格式,使其成為了一個靈活可擴展的查詢引擎,它原生支持了查詢CSV,Parquet,Avro,Json等存盤格式,也支持了本地,AWS S3,Azure Blob Storage,Google Cloud Storage等多種資料源,同時還提供了豐富的擴展介面,可以方便的讓我們接入自定義的資料格式和資料源,

DataFusion具有以下特性:

  • 高性能:基于Rust,不用進行垃圾回收;基于Arrow記憶體模型,列式存盤,方便向量化計算
  • 連接簡單:能夠與Arrow的其他生態互通
  • 集成和定制簡單:可以擴展資料源,方法和算子等
  • 完全基于Rust撰寫:高質量

基于DataFusion我們可以輕松構建高性能、高質量、可擴展的資料處理系統,

DBMS 與 Query Engine 的區別

DBMS: DataBase Management System

DBMS是一個包含完整資料庫管理特性的系統,主要包含以下幾個模塊:

  • 存盤系統
  • 元資料(Catalog)
  • 查詢引擎(Query Engine)
  • 訪問控制和權限
  • 資源管理
  • 管理工具
  • 客戶端
  • 多節點管理

Query Engine

DataFusion是一種查詢引擎,查詢引擎屬于資料庫管理系統的一部分,查詢引擎是用戶與資料庫互動的主要介面,主要作用是將面向用戶的高階查詢陳述句翻譯成可被具體執行的資料處理單元操作,然后執行操作獲取資料,

DataFusion架構

架構詳情

image

DataFusion查詢引擎主要由以下幾部分構成:

  1. 前端
    • 語法決議
    • 語意分析
    • Planner:語法樹轉換成邏輯計劃

主要涉及DFParserSqlToRel這兩個struct

  1. 查詢中間表示
    • Expression(運算式)/ Type system(型別系統)
    • Query Plan / Relational Operators(關系算子)
    • Rewrites / Optimizations(邏輯計劃優化)

主要涉及LogicalPlanExpr這兩個列舉類

  1. 查詢底層表示
    • Statistics(物理計劃算子的統計資訊,輔助物理計劃優化)
    • Partitions(分塊,多執行緒執行物理計劃算子)
    • Sort orders(物理計劃算子對資料是否排序)
    • Algorithms(物理計劃算子的執行演算法,如Hash join和Merge join)
    • Rewrites / Optimizations(物理計劃優化)

主要涉及PyhsicalPlanner這個trait實作的邏輯計劃到物理計劃的轉換,其中主要的關鍵點是ExecutionPlanPhysicalExpr

  1. 執行運行時(算子)
    • 分配資源
    • 向量化計算

主要涉及所有執行算子,如GroupedHashAggregateStream

擴展點

DataFusion查詢引擎的架構還是比較簡單的,其中的擴展點也非常清晰,我們可以從以下幾個方面對DataFusion進行擴展:

用戶自定義函式UDF

無狀態方法

/// 邏輯運算式列舉類
pub enum Expr {
    ...
    ScalarUDF {
        /// The function
        fun: Arc<ScalarUDF>,
        /// List of expressions to feed to the functions as arguments
        args: Vec<Expr>,
    },
    ...
}
/// UDF的邏輯運算式
pub struct ScalarUDF {
    /// 方法名
    pub name: String,
    /// 方法簽名
    pub signature: Signature,
    /// 回傳值型別
    pub return_type: ReturnTypeFunction,
    /// 方法實作
    pub fun: ScalarFunctionImplementation,
}
/// UDF的物理運算式
pub struct ScalarFunctionExpr {
    fun: ScalarFunctionImplementation,
    name: String,
    /// 引數運算式串列
    args: Vec<Arc<dyn PhysicalExpr>>,
    return_type: DataType,
}

用戶自定義聚合函式UADF

有狀態方法

/// 邏輯運算式列舉類
pub enum Expr {
    ...
    AggregateUDF {
        /// The function
        fun: Arc<AggregateUDF>,
        /// List of expressions to feed to the functions as arguments
        args: Vec<Expr>,
        /// Optional filter applied prior to aggregating
        filter: Option<Box<Expr>>,
    },
    ...
}
/// UADF的邏輯運算式
pub struct AggregateUDF {
    /// 方法名
    pub name: String,
    /// 方法簽名
    pub signature: Signature,
    /// 回傳值型別
    pub return_type: ReturnTypeFunction,
    /// 方法實作
    pub accumulator: AccumulatorFunctionImplementation,
    /// 需要保存的狀態的型別
    pub state_type: StateTypeFunction,
}
/// UADF的物理運算式
pub struct AggregateFunctionExpr {
    fun: AggregateUDF,
    args: Vec<Arc<dyn PhysicalExpr>>,
    data_type: DataType,
    name: String,
}

用戶自定義優化規則

Optimizer定義了承載優化規則的結構體,其中optimize方法實作了邏輯計劃優化的程序,優化規則串列中的每個優化規則會被以TOP-DOWNBOTTOM-UP方式作用于邏輯計劃樹,優化規則串列會被實施多個輪次,我們可以通過實作OptimizerRule這個trait來實作自己的優化邏輯,

pub struct Optimizer {
    /// All rules to apply
    pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
}

pub trait OptimizerRule {
    /// Try and rewrite `plan` to an optimized form, returning None if the plan cannot be
    /// optimized by this rule.
    fn try_optimize(
        &self,
        plan: &LogicalPlan,
        config: &dyn OptimizerConfig,
    ) -> Result<Option<LogicalPlan>>;

    ...
}

用戶自定義邏輯計劃算子

/// 邏輯計劃算子列舉類
pub enum LogicalPlan {
    ...
    Extension(Extension),
    ...
}
/// 自定義邏輯計劃算子
pub struct Extension {
    /// The runtime extension operator
    pub node: Arc<dyn UserDefinedLogicalNode>,
}
/// 自定義邏輯計劃算子需要實作的trait
pub trait UserDefinedLogicalNode: fmt::Debug + Send + Sync { ... }

用戶自定義物理計劃算子

/// 為自定義的邏輯計劃算子`UserDefinedLogcialNode`生成對應的物理計劃算子
pub trait ExtensionPlanner {
    async fn plan_extension(
        &self,
        planner: &dyn PhysicalPlanner,
        node: &dyn UserDefinedLogicalNode,
        logical_inputs: &[&LogicalPlan],
        physical_inputs: &[Arc<dyn ExecutionPlan>],
        session_state: &SessionState,
    ) -> Result<Option<Arc<dyn ExecutionPlan>>>;
}
/// DataFusion默認的邏輯計劃到物理計劃的轉換器提供了自定義轉換程序的結構體
pub struct DefaultPhysicalPlanner {
    extension_planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>,
}
/// 自定義物理計劃算子需要實作的trait
pub trait ExecutionPlan: Debug + Send + Sync { ... }

用戶自定義資料源

可以看出,自定義資料源其實就是生成一個對應的ExecutionPlan執行計劃,這個執行計劃實施的是掃表的任務,如果資料源支持下推的能力,我們在這里可以將projection filters limit等操作下推到掃表時,

/// 自定義資料源需要實作的trait
pub trait TableProvider: Sync + Send {
    ...
    async fn scan(
        &self,
        state: &SessionState,
        projection: Option<&Vec<usize>>,
        filters: &[Expr],
        limit: Option<usize>,
    ) -> Result<Arc<dyn ExecutionPlan>>;
    ...
}

用戶自定義元資料

pub trait CatalogProvider: Sync + Send {
    ...
	
    /// 根據名稱獲取Schema
    fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>>;
    /// 注冊Schema
    fn register_schema(
        &self,
        name: &str,
        schema: Arc<dyn SchemaProvider>,
    ) -> Result<Option<Arc<dyn SchemaProvider>>> {
        // use variables to avoid unused variable warnings
        let _ = name;
        let _ = schema;
        Err(DataFusionError::NotImplemented(
            "Registering new schemas is not supported".to_string(),
        ))
    }
}

pub trait SchemaProvider: Sync + Send {
    ...
    /// 根據表名獲取資料源
    async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>>;
    /// 注冊資料源
    fn register_table(
        &self,
        name: String,
        table: Arc<dyn TableProvider>,
    ) -> Result<Option<Arc<dyn TableProvider>>> {
        Err(DataFusionError::Execution(
            "schema provider does not support registering tables".to_owned(),
        ))
    }
    ...
}

邏輯計劃(LogicalPlan)

邏輯計劃其實就是資料流圖,資料從葉子節點流向根節點

let df: DataFrame = ctx.read_table("http_api_requests_total")?
            .filter(col("path").eq(lit("/api/v2/write")))?
            .aggregate([col("status")]), [count(lit(1))])?;

這里我們就使用DataFusion的API介面構造了一個資料流,首先read_table節點會從資料源中掃描資料到記憶體中,然后經過filter節點按照條件進行過濾,最后經過aggregate節點進行聚合,資料流過最后的節點時,就生成了我們需要的資料,

上述鏈式呼叫的API介面實際上并沒有真正執行對資料的操作,這里實際上是使用了建造者模式構造了邏輯計劃樹,最終生成的DataFrame實際上只是包含了一下資訊:

pub struct DataFrame {
    /// 查詢背景關系資訊,包含了元資料,用戶注冊的UDF和UADF,使用的優化器,使用的planner等資訊
    session_state: SessionState,
    /// 邏輯計劃樹的根節點
    plan: LogicalPlan,
}

支持的邏輯計劃算子

點擊查看代碼
Projection
Filter
Window
Aggregate
Sort
Join
TableScan

Repartition
Union
Subquery
Limit
Extension
Distinct

Values
Explain
Analyze
SetVariable
Prepare
Dml(...)

CreateExternalTable
CreateView
CreateCatalogSchema
CreateCatalog
DropTable
DropView

邏輯計劃優化

目標:確保結果相同的情況下,執行更快

image

初始的邏輯計劃,需要經過多個輪次的優化,才能生成執行效率更高的邏輯計劃,DataFusion本身的優化器內置了很多優化規則,用戶也可以擴展自己的優化規則,

內置優化輪次

  1. 下推(Pushdown):減少從一個節點到另一個節點的資料的行列數

    • PushDownProjection
    • PushDownFilter
    • PushDownLimit
  2. 簡化(Simplify):簡化運算式,減少運行時的運算,例如使用布爾代數的法則,將b > 2 AND b > 2簡化成b > 2

    • SimplifyExpressions
    • UnwrapCastInComparison
  3. 簡化(Simplify):洗掉無用的節點

  4. 平鋪子查詢(Flatten Subqueries):將子查詢用join重寫

    • DecorrelateWhereExists
    • DecorrelatedWhereIn
    • ScalarSubqueryToJoin
  5. 優化join:識別join謂詞

    • ExtractEqualJoinPredicate
    • RewriteDisjunctivePredicate
    • FilterNullJoinKeys
  6. 優化distinct

    • SingleDistinctToGroupBy
    • ReplaceDistinctWithAggregate

運算式運算(Expression Evaluation)

假設現在有這樣一個謂詞運算式

path = '/api/v2/write' or path is null

經過語法決議和轉換后,可以用如下運算式樹表示:

image

DataFusion在實施運算式運算時,使用了Arrow提供的向量化計算方法來加速運算

image

物理計劃(ExecutionPlan)

image

呼叫DataFusion提供的DefaultPhysicalPlanner中的create_physical_plan方法,可以將邏輯計劃樹轉換成物理計劃樹,其中物理計劃樹中的每個節點都是一個ExecutionPlan,執行物理計劃樹時,會從根節點開始呼叫execute方法,呼叫該方法還沒有執行對資料的操作,僅僅是將每個物理計劃算子轉換成一個RecordBatchStream算子,形成資料流算子樹,這些RecordBatchStream算子都實作了future包提供的Stream特性,當我們最終呼叫RecordBatchStreamcollect方法時,才會從根節點開始poll一次來獲取一下輪要處理的資料,根節點的poll方法內會呼叫子節點的poll方法,最終每poll一次,整棵樹都會進行一次資料從葉子節點到根節點的流動,生成一個RecordBatch

image

DataFusion實作的物理計劃算子具有以下特性:

  • 異步:避免了阻塞I/O
  • 流式:資料是流式處理的
  • 向量化:每次可以向量化地處理一個RecordBatch
  • 分片:每個算子都可以并行,可以產生多個分片
  • 多核

結語

DataFusion本身只是一個簡單,高效,可擴展的查詢引擎框架,用戶可以將DataFusion作為開發大型資料中臺的基礎組件,也可以輕易地將DataFusion嵌入服務中作為查詢引擎,也可以使用DataFusion構建自己的資料庫系統,如果期望使用分布式的查詢引擎,可以關注基于ArrowDataFusion搭建的分布式查詢引擎Ballista,

轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/552588.html

標籤:其他

上一篇:MySQL的varchar存盤原理:InnoDB記錄存盤結構

下一篇:返回列表

標籤雲
其他(159123) Python(38137) JavaScript(25431) Java(18044) C(15226) 區塊鏈(8267) C#(7972) AI(7469) 爪哇(7425) MySQL(7191) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5871) 数组(5741) R(5409) Linux(5340) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4572) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2433) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) .NET技术(1973) 功能(1967) Web開發(1951) HtmlCss(1937) python-3.x(1918) C++(1917) 弹簧靴(1913) xml(1889) PostgreSQL(1877) .NETCore(1860) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • GPU虛擬機創建時間深度優化

    **?桔妹導讀:**GPU虛擬機實體創建速度慢是公有云面臨的普遍問題,由于通常情況下創建虛擬機屬于低頻操作而未引起業界的重視,實際生產中還是存在對GPU實體創建時間有苛刻要求的業務場景。本文將介紹滴滴云在解決該問題時的思路、方法、并展示最終的優化成果。 從公有云服務商那里購買過虛擬主機的資深用戶,一 ......

    uj5u.com 2020-09-10 06:09:13 more
  • 可編程網卡芯片在滴滴云網路的應用實踐

    **?桔妹導讀:**隨著云規模不斷擴大以及業務層面對延遲、帶寬的要求越來越高,采用DPDK 加速網路報文處理的方式在橫向縱向擴展都出現了局限性。可編程芯片成為業界熱點。本文主要講述了可編程網卡芯片在滴滴云網路中的應用實踐,遇到的問題、帶來的收益以及開源社區貢獻。 #1. 資料中心面臨的問題 隨著滴滴 ......

    uj5u.com 2020-09-10 06:10:21 more
  • 滴滴資料通道服務演進之路

    **?桔妹導讀:**滴滴資料通道引擎承載著全公司的資料同步,為下游實時和離線場景提供了必不可少的源資料。隨著任務量的不斷增加,資料通道的整體架構也隨之發生改變。本文介紹了滴滴資料通道的發展歷程,遇到的問題以及今后的規劃。 #1. 背景 資料,對于任何一家互聯網公司來說都是非常重要的資產,公司的大資料 ......

    uj5u.com 2020-09-10 06:11:05 more
  • 滴滴AI Labs斬獲國際機器翻譯大賽中譯英方向世界第三

    **桔妹導讀:**深耕人工智能領域,致力于探索AI讓出行更美好的滴滴AI Labs再次斬獲國際大獎,這次獲獎的專案是什么呢?一起來看看詳細報道吧! 近日,由國際計算語言學協會ACL(The Association for Computational Linguistics)舉辦的世界最具影響力的機器 ......

    uj5u.com 2020-09-10 06:11:29 more
  • MPP (Massively Parallel Processing)大規模并行處理

    1、什么是mpp? MPP (Massively Parallel Processing),即大規模并行處理,在資料庫非共享集群中,每個節點都有獨立的磁盤存盤系統和記憶體系統,業務資料根據資料庫模型和應用特點劃分到各個節點上,每臺資料節點通過專用網路或者商業通用網路互相連接,彼此協同計算,作為整體提供 ......

    uj5u.com 2020-09-10 06:11:41 more
  • 滴滴資料倉庫指標體系建設實踐

    **桔妹導讀:**指標體系是什么?如何使用OSM模型和AARRR模型搭建指標體系?如何統一流程、規范化、工具化管理指標體系?本文會對建設的方法論結合滴滴資料指標體系建設實踐進行解答分析。 #1. 什么是指標體系 ##1.1 指標體系定義 指標體系是將零散單點的具有相互聯系的指標,系統化的組織起來,通 ......

    uj5u.com 2020-09-10 06:12:52 more
  • 單表千萬行資料庫 LIKE 搜索優化手記

    我們經常在資料庫中使用 LIKE 運算子來完成對資料的模糊搜索,LIKE 運算子用于在 WHERE 子句中搜索列中的指定模式。 如果需要查找客戶表中所有姓氏是“張”的資料,可以使用下面的 SQL 陳述句: SELECT * FROM Customer WHERE Name LIKE '張%' 如果需要 ......

    uj5u.com 2020-09-10 06:13:25 more
  • 滴滴Ceph分布式存盤系統優化之鎖優化

    **桔妹導讀:**Ceph是國際知名的開源分布式存盤系統,在工業界和學術界都有著重要的影響。Ceph的架構和演算法設計發表在國際系統領域頂級會議OSDI、SOSP、SC等上。Ceph社區得到Red Hat、SUSE、Intel等大公司的大力支持。Ceph是國際云計算領域應用最廣泛的開源分布式存盤系統, ......

    uj5u.com 2020-09-10 06:14:51 more
  • es~通過ElasticsearchTemplate進行聚合~嵌套聚合

    之前寫過《es~通過ElasticsearchTemplate進行聚合操作》的文章,這一次主要寫一個嵌套的聚合,例如先對sex集合,再對desc聚合,最后再對age求和,共三層嵌套。 Aggregations的部分特性類似于SQL語言中的group by,avg,sum等函式,Aggregation ......

    uj5u.com 2020-09-10 06:14:59 more
  • 爬蟲日志監控 -- Elastc Stack(ELK)部署

    傻瓜式部署,只需替換IP與用戶 導讀: 現ELK四大組件分別為:Elasticsearch(核心)、logstash(處理)、filebeat(采集)、kibana(可視化) 下載均在https://www.elastic.co/cn/downloads/下tar包,各組件版本最好一致,配合fdm會 ......

    uj5u.com 2020-09-10 06:15:05 more
最新发布
  • Apache Arrow DataFusion原理與架構

    本篇主要介紹了一種使用Rust語言撰寫的查詢引擎——DataFusion,其使用了基于Arrow格式的記憶體模型,結合Rust語言本身的優勢,達成了非常優秀的性能指標 DataFusion是一個查詢引擎而非資料庫,因此其本身不具備存盤資料的能力。但正因為不依賴底層存盤的格式,使其成為了一個靈活可擴展的 ......

    uj5u.com 2023-05-16 15:30:16 more
  • MySQL的varchar存盤原理:InnoDB記錄存盤結構

    摘要:varchar(M) 能存多少個字符,為什么提示最大16383?innodb怎么知道varchar真正有多長?記錄為NULL,innodb如何處理?某個列資料占用的位元組數非常多怎么辦?影響每行實際可用空間的因素有哪些?本篇圍繞innodb默認行格式dynamic來說說原理。 本文分享自華為云社 ......

    uj5u.com 2023-05-16 15:29:49 more
  • 架構師日記-從資料庫發展歷程到資料結構設計探析

    本文針對資料存盤相關名詞概念進行了解釋,重點介紹了資料庫技術的發展史。為了豐富文章的可讀性以及實用性,又從資料結構設計層面進行了部分技術實戰能力的外延擴展,闡述了拉鏈表,位運算,環形佇列等相關資料結構在軟體開發領域的應用,希望本文給你帶來識訓。 ......

    uj5u.com 2023-05-16 15:29:42 more
  • MySQL 存盤程序&觸發器&事務

    存盤程序 概念 存盤程序(Stored Procedure),是為了完成特定功能的SQL陳述句集。 優點 存盤程序可以理解為shell腳本這型別的命令集輸出工具,但是在底層,存盤程序擁有更多的優點: ==語言的靈活性跟功能性更強==,在原有基礎之上可以插入控制陳述句、回圈陳述句等讓SQL陳述句的功能更強,能 ......

    uj5u.com 2023-05-16 15:29:30 more
  • pg_enterprise_views偶然發現的PG神仙插件!

    一直從事資料庫相關的作業,對于PG而言最大的問題其實是在運維管理方面,其缺乏有效且直觀成體系的系統表,苦覓良久,今日在PG官網中發現了一款新收錄的免費插件,其提供了數十張系統表,內容涵蓋了從作業系統到資料庫的負載指標、等待事件、會話、客戶端、SQL、SQL執行計劃、超時鎖、長事務、資料庫物件、寫行程 ......

    uj5u.com 2023-05-16 15:28:49 more
  • Redis實戰解讀-初識Redis&Redis基本資料型別

    一.初識Redis
    1.什么是Redis
    ? Redis是一個速度非常快的非關系型資料庫(non-relational database),它可以存盤鍵(key)與五種不同型別的值的映射(mapping),可以將存盤在記憶體的鍵值對資料持久化到磁盤,可以使用復制特性來擴展讀性能,也可以采用客戶端分片來... ......

    uj5u.com 2023-05-16 15:28:18 more
  • Redis資料結構二之SDS和雙向鏈表

    本文首發于公眾號:Hunter后端 原文鏈接:Redis資料結構二之SDS和雙向鏈表 這一篇筆記介紹一下 SDS(simple dynamic string)和雙向鏈表。 以下是本篇筆記目錄: SDS 常數復雜度獲取字串長度 杜絕緩沖區溢位 減少修改字串帶來的記憶體重分配次數 二進制安全 兼容C字 ......

    uj5u.com 2023-05-16 15:28:08 more
  • MySQL 8.0不再擔心被垃圾SQL搞爆記憶體

    MySQL 8.0.28引入的新功能 MySQL 8.0.28開始,新增一個特性,支持監控統計并限制各個連接(會話)的記憶體消耗,避免大量用戶連接因為執行垃圾SQL消耗過多記憶體,造成可能被OOM kill的風險。 首先,需要先設定系統選項 global_connection_memory_tracki ......

    uj5u.com 2023-05-16 15:27:50 more
  • 06~12-Esp8266物聯網芯片的使用(一)-part02/03-ESP8266開發環境、

    上一章主要作了芯片介紹,這一章主要作對開發環境的介紹。 認識Arduino Arduino是一款便捷靈活、方便上手的開源電子原型平臺。包含硬體(各種型號的Arduino板)和軟體(ArduinoIDE)。它構建于開放原始碼simple I/O介面版,并且具有使用類似Java、C語言的Processi ......

    uj5u.com 2023-05-16 15:22:35 more
  • MySQL的varchar存盤原理:InnoDB記錄存盤結構

    摘要:varchar(M) 能存多少個字符,為什么提示最大16383?innodb怎么知道varchar真正有多長?記錄為NULL,innodb如何處理?某個列資料占用的位元組數非常多怎么辦?影響每行實際可用空間的因素有哪些?本篇圍繞innodb默認行格式dynamic來說說原理。 本文分享自華為云社 ......

    uj5u.com 2023-05-16 15:16:31 more