主頁 > 資料庫 > ElasticSearch - 批量更新bulk死鎖問題排查

ElasticSearch - 批量更新bulk死鎖問題排查

2023-07-06 09:14:09 資料庫

一、問題系統介紹

  1. 監聽商品變更MQ訊息,查詢商品最新的資訊,呼叫BulkProcessor批量更新ES集群中的商品欄位資訊;

  2. 由于商品資料非常多,所以將商品資料存盤到ES集群上,整個ES集群共劃分了256個分片,并根據商品的三級類目ID進行分片路由,

比如一個SKU的商品名稱發生變化,我們就會收到這個SKU的變更MQ訊息,然后再去查詢商品介面,將商品的最新名稱查詢回來,再根據這個SKU的三級分類ID進行路由,找到對應的ES集群分片,然后更新商品名稱欄位資訊,

由于商品變更MQ訊息量巨大,為了提升更新ES的性能,防止出現MQ訊息積壓問題,所以本系統使用了BulkProcessor進行批量異步更新,

ES客戶端版本如下:

        <dependency>
            <artifactId>elasticsearch-rest-client</artifactId>
            <groupId>org.elasticsearch.client</groupId>
            <version>6.5.3</version>
        </dependency>

BulkProcessor配置偽代碼如下:

        //在這里呼叫build()方法構造bulkProcessor,在底層實際上是用了bulk的異步操作
        this.fullDataBulkProcessor = BulkProcessor.builder((request, bulkListener) ->
                fullDataEsClient.getClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener)
                // 1000條資料請求執行一次bulk
                .setBulkActions(1000)
                // 5mb的資料重繪一次bulk
                .setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB))
                // 并發請求數量, 0不并發, 1并發允許執行
                .setConcurrentRequests(1)
                // 固定1s必須重繪一次
                .setFlushInterval(TimeValue.timeValueSeconds(1L))
                // 重試5次,間隔1s
                .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 5))
                .build();

二、問題怎么發現的

  1. 618大促開始后,由于商品變更MQ訊息非常頻繁,MQ訊息每天的訊息量更是達到了日常的數倍,而且好多商品還變更了三級類目ID;

  2. 系統在更新這些三級類目ID發生變化的SKU商品資訊時,根據修改后的三級類目ID路由后的分片更新商品資訊時發生了錯誤,并且重試了5次,依然沒有成功;

  3. 因為在新路由的分片上沒有這個商品的索引資訊,這些更新請求永遠也不會執行成功,系統的日志檔案中也記錄了大量的例外重試日志,

  4. 商品變更MQ訊息也開始出現了積壓報警,MQ訊息的消費速度明顯趕不上生產速度,

  5. 觀察MQ訊息消費者的UMP監控資料,發現消費性能很平穩,沒有明顯波動,但是呼叫次數會在系統消費MQ一段時間后出現斷崖式下降,由原來的每分鐘幾萬呼叫量逐漸下降到個位數,

  6. 在重啟應用后,系統又開始消費,UMP監控呼叫次數恢復到正常水平,但是系統運行一段時間后,還是會出現消費暫停問題,仿佛所有消費執行緒都被暫停了一樣,

三、排查問題的詳細程序

首先找一臺暫停消費MQ訊息的容器,查看應用行程ID,使用jstack命令dump應用行程的整個執行緒堆疊資訊,將匯出的執行緒堆疊資訊打包上傳到 https://fastthread.io/ 進行執行緒狀態分析,分析報告如下:

通過分析報告發現有124個處于BLOCKED狀態的執行緒,然后可以點擊查看各執行緒的詳細堆疊資訊,堆疊資訊如下:

連續查看多個執行緒的詳細堆疊資訊,MQ消費執行緒都是在waiting to lock <0x00000005eb781b10> (a org.elasticsearch.action.bulk.BulkProcessor),然后根據0x00000005eb781b10去搜索發現,這個物件鎖正在被另外一個執行緒占用,占用執行緒堆疊資訊如下:

這個執行緒狀態此時正處于WAITING狀態,通過執行緒名稱發現,該執行緒應該是ES客戶端內部執行緒,正是該執行緒搶占了業務執行緒的鎖,然后又在等待其他條件觸發該執行緒執行,所以導致了所有的MQ消費業務執行緒一直無法獲取BulkProcessor內部的鎖,導致出現了消費暫停問題,

但是這個執行緒elasticsearch[scheduler][T#1]為啥不能執行? 它是什么時候啟動的? 又有什么作用?

就需要我們對BulkProcessor進行深入分析,由于BulkProcessor是通過builder模塊進行創建的,所以深入builder原始碼,了解一下BulkProcessor的創建程序,

public static Builder builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener) {
        Objects.requireNonNull(consumer, "consumer");
        Objects.requireNonNull(listener, "listener");
        final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY);
        return new Builder(consumer, listener,
                (delay, executor, command) -> scheduledThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS),
                () -> Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS));
    }

內部創建了一個時間調度執行執行緒池,執行緒命名規則和上述持有鎖的執行緒名稱相似,具體代碼如下:

static ScheduledThreadPoolExecutor initScheduler(Settings settings) {
        ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1,
                EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy());
        scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        scheduler.setRemoveOnCancelPolicy(true);
        return scheduler;
    }

最后在build方法內部執行了BulkProcessor的內部有參構造方法,在構造方法內部啟動了一個周期性執行的flushing任務,代碼如下

 BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, Listener listener,
                  int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval,
                  Scheduler scheduler, Runnable onClose) {
        this.bulkActions = bulkActions;
        this.bulkSize = bulkSize.getBytes();
        this.bulkRequest = new BulkRequest();
        this.scheduler = scheduler;
        this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, scheduler, concurrentRequests);
        // Start period flushing task after everything is setup
        this.cancellableFlushTask = startFlushTask(flushInterval, scheduler);
        this.onClose = onClose;
    }
private Scheduler.Cancellable startFlushTask(TimeValue flushInterval, Scheduler scheduler) {
        if (flushInterval == null) {
            return new Scheduler.Cancellable() {
                @Override
                public void cancel() {}

                @Override
                public boolean isCancelled() {
                    return true;
                }
            };
        }
        final Runnable flushRunnable = scheduler.preserveContext(new Flush());
        return scheduler.scheduleWithFixedDelay(flushRunnable, flushInterval, ThreadPool.Names.GENERIC);
    }
class Flush implements Runnable {

        @Override
        public void run() {
            synchronized (BulkProcessor.this) {
                if (closed) {
                    return;
                }
                if (bulkRequest.numberOfActions() == 0) {
                    return;
                }
                execute();
            }
        }
    }

通過源代碼發現,該flush任務就是在創建BulkProcessor物件時設定的固定時間flush邏輯,當setFlushInterval方法引數生效,就會啟動一個后臺定時flush任務,flush間隔,由setFlushInterval方法引數定義,該flush任務在運行期間,也會搶占BulkProcessor物件鎖,搶到鎖后,才會執行execute方法,具體的方法呼叫關系源代碼如下:

/**
     * Adds the data from the bytes to be processed by the bulk processor
     */
    public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
                                          @Nullable String defaultPipeline, @Nullable Object payload, XContentType xContentType) throws Exception {
        bulkRequest.add(data, defaultIndex, defaultType, null, null, null, defaultPipeline, payload, true, xContentType);
        executeIfNeeded();
        return this;
    }

    private void executeIfNeeded() {
        ensureOpen();
        if (!isOverTheLimit()) {
            return;
        }
        execute();
    }

    // (currently) needs to be executed under a lock
    private void execute() {
        final BulkRequest bulkRequest = this.bulkRequest;
        final long executionId = executionIdGen.incrementAndGet();

        this.bulkRequest = new BulkRequest();
        this.bulkRequestHandler.execute(bulkRequest, executionId);
    }

而上述代碼中的add方法,則是由MQ消費業務執行緒去呼叫,在該方法上同樣有一個synchronized關鍵字,所以消費MQ業務執行緒會和flush任務執行執行緒直接會存在鎖競爭關系,具體MQ消費業務執行緒呼叫偽代碼如下:

 @Override
 public void upsertCommonSku(CommonSkuEntity commonSkuEntity) {
            String source = JsonUtil.toString(commonSkuEntity);
            UpdateRequest updateRequest = new UpdateRequest(Constants.INDEX_NAME_SPU, Constants.INDEX_TYPE, commonSkuEntity.getSkuId().toString());
            updateRequest.doc(source, XContentType.JSON);
            IndexRequest indexRequest = new IndexRequest(Constants.INDEX_NAME_SPU, Constants.INDEX_TYPE, commonSkuEntity.getSkuId().toString());
            indexRequest.source(source, XContentType.JSON);
            updateRequest.upsert(indexRequest);
            updateRequest.routing(commonSkuEntity.getCat3().toString());
            fullbulkProcessor.add(updateRequest);
}  

通過以上對執行緒堆疊分析,發現所有的業務執行緒都在等待elasticsearch[scheduler][T#1]執行緒釋放BulkProcessor物件鎖,但是該執行緒確一直沒有釋放該物件鎖,從而出現了業務執行緒的死鎖問題,

結合應用日志檔案中出現的大量例外重試日志,可能與BulkProcessor的例外重試策略有關,然后進一步了解BulkProcessor的例外重試代碼邏輯,由于業務執行緒中提交BulkRequest請求都統一提交到了BulkRequestHandler物件中的execute方法內部進行處理,代碼如下:

public final class BulkRequestHandler {
    private final Logger logger;
    private final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer;
    private final BulkProcessor.Listener listener;
    private final Semaphore semaphore;
    private final Retry retry;
    private final int concurrentRequests;

    BulkRequestHandler(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy,
                       BulkProcessor.Listener listener, Scheduler scheduler, int concurrentRequests) {
        assert concurrentRequests >= 0;
        this.logger = Loggers.getLogger(getClass());
        this.consumer = consumer;
        this.listener = listener;
        this.concurrentRequests = concurrentRequests;
        this.retry = new Retry(backoffPolicy, scheduler);
        this.semaphore = new Semaphore(concurrentRequests > 0 ? concurrentRequests : 1);
    }

    public void execute(BulkRequest bulkRequest, long executionId) {
        Runnable toRelease = () -> {};
        boolean bulkRequestSetupSuccessful = false;
        try {
            listener.beforeBulk(executionId, bulkRequest);
            semaphore.acquire();
            toRelease = semaphore::release;
            CountDownLatch latch = new CountDownLatch(1);
            retry.withBackoff(consumer, bulkRequest, new ActionListener<BulkResponse>() {
                @Override
                public void onResponse(BulkResponse response) {
                    try {
                        listener.afterBulk(executionId, bulkRequest, response);
                    } finally {
                        semaphore.release();
                        latch.countDown();
                    }
                }

                @Override
                public void onFailure(Exception e) {
                    try {
                        listener.afterBulk(executionId, bulkRequest, e);
                    } finally {
                        semaphore.release();
                        latch.countDown();
                    }
                }
            });
            bulkRequestSetupSuccessful = true;
            if (concurrentRequests == 0) {
                latch.await();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.info(() -> new ParameterizedMessage("Bulk request {} has been cancelled.", executionId), e);
            listener.afterBulk(executionId, bulkRequest, e);
        } catch (Exception e) {
            logger.warn(() -> new ParameterizedMessage("Failed to execute bulk request {}.", executionId), e);
            listener.afterBulk(executionId, bulkRequest, e);
        } finally {
            if (bulkRequestSetupSuccessful == false) {  // if we fail on client.bulk() release the semaphore
                toRelease.run();
            }
        }
    }

    boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
        if (semaphore.tryAcquire(this.concurrentRequests, timeout, unit)) {
            semaphore.release(this.concurrentRequests);
            return true;
        }
        return false;
    }
}

BulkRequestHandler通過構造方法初始化了一個Retry任務物件,該物件中也傳入了一個Scheduler,且該物件和flush任務中傳入的是同一個執行緒池,該執行緒池內部只維護了一個固定執行緒,而execute方法首先會先根據Semaphore來控制并發執行數量,該并發數量在構建BulkProcessor時通過引數指定,通過上述配置發現該值配置為1,所以每次只允許一個執行緒執行該方法,即MQ消費業務執行緒和flush任務執行緒,同一時間只能有一個執行緒可以執行,然后下面在了解一下重試任務是如何執行的,具體看如下代碼:

 public void withBackoff(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BulkRequest bulkRequest,
                            ActionListener<BulkResponse> listener) {
        RetryHandler r = new RetryHandler(backoffPolicy, consumer, listener, scheduler);
        r.execute(bulkRequest);
    }

RetryHandler內部會執行提交bulkRequest請求,同時也會監聽bulkRequest執行例外狀態,然后執行任務重試邏輯,重試代碼如下:

private void retry(BulkRequest bulkRequestForRetry) {
            assert backoff.hasNext();
            TimeValue next = backoff.next();
            logger.trace("Retry of bulk request scheduled in {} ms.", next.millis());
            Runnable command = scheduler.preserveContext(() -> this.execute(bulkRequestForRetry));
            scheduledRequestFuture = scheduler.schedule(next, ThreadPool.Names.SAME, command);
        }

RetryHandler將執行失敗的bulk請求重新交給了內部scheduler執行緒池去執行,通過以上代碼了解,該執行緒池內部只維護了一個固定執行緒,同時該執行緒池可能還會被另一個flush任務去占用執行,所以如果重試邏輯正在執行的時候,此時執行緒池內的唯一執行緒正在執行flush任務,則會阻塞重試邏輯執行,重試邏輯不能執行完成,則不會釋放Semaphore,但是由于并發數量配置的是1,所以flush任務執行緒需要等待其他執行緒釋放一個Semaphore許可后才能繼續執行,所以此處形成了回圈等待,導致Semaphore和BulkProcessor物件鎖都無法釋放,從而使得所有的MQ消費業務執行緒都阻塞在獲取BulkProcessor鎖之前,

同時,在GitHub的ES客戶端原始碼客戶端上也能搜索到類似問題,例如: https://github.com/elastic/elasticsearch/issues/47599 ,所以更加印證了之前的猜想,就是因為bulk的不斷重試從而引發了BulkProcessor內部的死鎖問題,

四、如何解決問題

既然前邊已經了解到了問題產生的原因,所以就有了如下幾種解決方案:

1.升級ES客戶端版本到7.6正式版,后續版本通過將例外重試任務執行緒池和flush任務執行緒池進行了物理隔離,從而避免了執行緒池的競爭,但是需要考慮版本兼容性,

2.由于該死鎖問題是由大量例外重試邏輯引起的,可以在不影響業務邏輯的情況取消重試邏輯,該方案可以不需要升級客戶端版本,但是需要評估業務影響,執行失敗的請求可以通過其他其他方式進行業務重試,

如有疏漏不妥之處,歡迎指正!

作者:京東零售 曹志飛

來源:京東云開發者社區

轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/556656.html

標籤:其它

上一篇:Mysql進階篇(一)之存盤引擎

下一篇:返回列表

標籤雲
其他(162108) Python(38266) JavaScript(25524) Java(18290) C(15238) 區塊鏈(8275) C#(7972) AI(7469) 爪哇(7425) MySQL(7288) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5876) 数组(5741) R(5409) Linux(5347) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4611) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2438) ASP.NET(2404) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) HtmlCss(1989) .NET技术(1985) 功能(1967) Web開發(1951) C++(1942) python-3.x(1918) 弹簧靴(1913) xml(1889) PostgreSQL(1882) .NETCore(1863) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • GPU虛擬機創建時間深度優化

    **?桔妹導讀:**GPU虛擬機實體創建速度慢是公有云面臨的普遍問題,由于通常情況下創建虛擬機屬于低頻操作而未引起業界的重視,實際生產中還是存在對GPU實體創建時間有苛刻要求的業務場景。本文將介紹滴滴云在解決該問題時的思路、方法、并展示最終的優化成果。 從公有云服務商那里購買過虛擬主機的資深用戶,一 ......

    uj5u.com 2020-09-10 06:09:13 more
  • 可編程網卡芯片在滴滴云網路的應用實踐

    **?桔妹導讀:**隨著云規模不斷擴大以及業務層面對延遲、帶寬的要求越來越高,采用DPDK 加速網路報文處理的方式在橫向縱向擴展都出現了局限性。可編程芯片成為業界熱點。本文主要講述了可編程網卡芯片在滴滴云網路中的應用實踐,遇到的問題、帶來的收益以及開源社區貢獻。 #1. 資料中心面臨的問題 隨著滴滴 ......

    uj5u.com 2020-09-10 06:10:21 more
  • 滴滴資料通道服務演進之路

    **?桔妹導讀:**滴滴資料通道引擎承載著全公司的資料同步,為下游實時和離線場景提供了必不可少的源資料。隨著任務量的不斷增加,資料通道的整體架構也隨之發生改變。本文介紹了滴滴資料通道的發展歷程,遇到的問題以及今后的規劃。 #1. 背景 資料,對于任何一家互聯網公司來說都是非常重要的資產,公司的大資料 ......

    uj5u.com 2020-09-10 06:11:05 more
  • 滴滴AI Labs斬獲國際機器翻譯大賽中譯英方向世界第三

    **桔妹導讀:**深耕人工智能領域,致力于探索AI讓出行更美好的滴滴AI Labs再次斬獲國際大獎,這次獲獎的專案是什么呢?一起來看看詳細報道吧! 近日,由國際計算語言學協會ACL(The Association for Computational Linguistics)舉辦的世界最具影響力的機器 ......

    uj5u.com 2020-09-10 06:11:29 more
  • MPP (Massively Parallel Processing)大規模并行處理

    1、什么是mpp? MPP (Massively Parallel Processing),即大規模并行處理,在資料庫非共享集群中,每個節點都有獨立的磁盤存盤系統和記憶體系統,業務資料根據資料庫模型和應用特點劃分到各個節點上,每臺資料節點通過專用網路或者商業通用網路互相連接,彼此協同計算,作為整體提供 ......

    uj5u.com 2020-09-10 06:11:41 more
  • 滴滴資料倉庫指標體系建設實踐

    **桔妹導讀:**指標體系是什么?如何使用OSM模型和AARRR模型搭建指標體系?如何統一流程、規范化、工具化管理指標體系?本文會對建設的方法論結合滴滴資料指標體系建設實踐進行解答分析。 #1. 什么是指標體系 ##1.1 指標體系定義 指標體系是將零散單點的具有相互聯系的指標,系統化的組織起來,通 ......

    uj5u.com 2020-09-10 06:12:52 more
  • 單表千萬行資料庫 LIKE 搜索優化手記

    我們經常在資料庫中使用 LIKE 運算子來完成對資料的模糊搜索,LIKE 運算子用于在 WHERE 子句中搜索列中的指定模式。 如果需要查找客戶表中所有姓氏是“張”的資料,可以使用下面的 SQL 陳述句: SELECT * FROM Customer WHERE Name LIKE '張%' 如果需要 ......

    uj5u.com 2020-09-10 06:13:25 more
  • 滴滴Ceph分布式存盤系統優化之鎖優化

    **桔妹導讀:**Ceph是國際知名的開源分布式存盤系統,在工業界和學術界都有著重要的影響。Ceph的架構和演算法設計發表在國際系統領域頂級會議OSDI、SOSP、SC等上。Ceph社區得到Red Hat、SUSE、Intel等大公司的大力支持。Ceph是國際云計算領域應用最廣泛的開源分布式存盤系統, ......

    uj5u.com 2020-09-10 06:14:51 more
  • es~通過ElasticsearchTemplate進行聚合~嵌套聚合

    之前寫過《es~通過ElasticsearchTemplate進行聚合操作》的文章,這一次主要寫一個嵌套的聚合,例如先對sex集合,再對desc聚合,最后再對age求和,共三層嵌套。 Aggregations的部分特性類似于SQL語言中的group by,avg,sum等函式,Aggregation ......

    uj5u.com 2020-09-10 06:14:59 more
  • 爬蟲日志監控 -- Elastc Stack(ELK)部署

    傻瓜式部署,只需替換IP與用戶 導讀: 現ELK四大組件分別為:Elasticsearch(核心)、logstash(處理)、filebeat(采集)、kibana(可視化) 下載均在https://www.elastic.co/cn/downloads/下tar包,各組件版本最好一致,配合fdm會 ......

    uj5u.com 2020-09-10 06:15:05 more
最新发布
  • ElasticSearch - 批量更新bulk死鎖問題排查

    由于商品變更MQ訊息量巨大,為了提升更新ES的性能,防止出現MQ訊息積壓問題,所以本系統使用了BulkProcessor進行批量異步更新。 ......

    uj5u.com 2023-07-06 09:14:09 more
  • Mysql進階篇(一)之存盤引擎

    # 一. MySQL體系結構 ![](https://tcs-devops.aliyuncs.com/storage/112v957e3962f4a8a6d4d8eb1a194d885fa0?Signature=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJBcHB ......

    uj5u.com 2023-07-06 09:13:13 more
  • MySQL的match函式在sp中使用的BUG決議

    ## 一、問題發現 在一次開發中在sp中使用`MySQL PREPARE`以后,使用`match AGAINST`陳述句作為`prepare stmt`的引數后,發現執行第二遍call會導致資料庫crash,于是開始動手調查問題發生的原因。 > 注:本次使用的 MySQL 資料庫版本為最新的debug ......

    uj5u.com 2023-07-06 09:12:50 more
  • 分布式資料庫 Join 查詢設計與實作淺析

    相對于單例資料庫的查詢操作,分布式資料查詢會有很多技術難題。本文記錄 Mysql 分庫分表 和 Elasticsearch Join 查詢的實作思路,了解分布式場景資料處理的設計方案。

    文章從常用的關系型資料庫 MySQL 的分庫分表Join 分析,再到非關系型 ElasticSearch 來分析... ......

    uj5u.com 2023-07-06 09:12:42 more
  • Spark的一些重要概念

    # Shuffle的深入理解 什么是Shuffle,本意為洗牌,在資料處理領域里面,意為將數打散。 問題:shuffle一定有網路傳輸嗎?有網路傳輸的一定是Shuffle嗎? ## Shuffle的概念 通過網路將資料傳輸到多臺機器,資料被打散,但是有網路傳輸,不一定就有shuffle,Shuffl ......

    uj5u.com 2023-07-06 09:12:15 more
  • 基于袋鼠云實時開發平臺開發 FlinkSQL 任務的實踐探索

    隨著業務的發展,[實時場景](https://www.dtstack.com/dtinsight/streamworks?src=https://www.cnblogs.com/DTinsight/p/szsm)在各個?業中變得越來越重要。?論是?融、電商還是物流,實時資料處理都成為了其中的關鍵環節。Flink 憑借其強?的[流處理特性](https://www.dts ......

    uj5u.com 2023-07-06 09:11:54 more
  • ORA-20000: Unable to set values for index xxx: does not exis

    使用expdp/impdp匯出匯入資料時,遇到ORA-2000錯誤,如下所示: Processing object type SCHEMA_EXPORT/TABLE/GRANT/OWNER_GRANT/OBJECT_GRANTProcessing object type SCHEMA_EXPORT/ ......

    uj5u.com 2023-07-05 09:04:50 more
  • “遠程客戶端操作hdfs創建檔案夾”,驗證環境是否配置成功,以及HDFS

    文章中包含我所遇到的錯誤,進行了HDFS錯誤整改,以及后面有操作創建“遠程客戶端操作hdfs創建檔案夾”,驗證環境是否配置成功的程序。 ......

    uj5u.com 2023-07-05 09:04:09 more
  • 數倉性能調優:大寬表關聯MERGE性能優化

    摘要:本文主要為大家講解在數倉性能調優程序中,關于大寬表關聯MERGE性能優化程序。 本文分享自華為云社區《GaussDB(DWS)性能調優:大寬表關聯MERGE性能優化》,作者:譡里個檔。 【業務背景】 如下MERGE陳述句執行耗時長達2034s MERGE INTO sdifin.hah_ae_l ......

    uj5u.com 2023-07-05 09:03:43 more
  • 數倉性能調優:大寬表關聯MERGE性能優化

    摘要:本文主要為大家講解在數倉性能調優程序中,關于大寬表關聯MERGE性能優化程序。 本文分享自華為云社區《GaussDB(DWS)性能調優:大寬表關聯MERGE性能優化》,作者:譡里個檔。 【業務背景】 如下MERGE陳述句執行耗時長達2034s MERGE INTO sdifin.hah_ae_l ......

    uj5u.com 2023-07-05 09:02:32 more