From a8dc84c4570aeb2d3aba1322251665325ae33f75 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Fri, 24 Jan 2025 09:00:22 +0800 Subject: [PATCH 1/2] add re-produce case --- .../cloud_cumulative_compaction_policy.cpp | 15 +++ be/src/cloud/cloud_full_compaction.cpp | 3 + .../test_cloud_full_compaction_do_lease.out | 4 + .../doris/regression/suite/Suite.groovy | 18 +++ ...test_cloud_full_compaction_do_lease.groovy | 118 ++++++++++++++++++ 5 files changed, 158 insertions(+) create mode 100644 regression-test/data/fault_injection_p0/cloud/test_cloud_full_compaction_do_lease.out create mode 100644 regression-test/suites/fault_injection_p0/cloud/test_cloud_full_compaction_do_lease.groovy diff --git a/be/src/cloud/cloud_cumulative_compaction_policy.cpp b/be/src/cloud/cloud_cumulative_compaction_policy.cpp index b93888e20311ad..cc5037686606a0 100644 --- a/be/src/cloud/cloud_cumulative_compaction_policy.cpp +++ b/be/src/cloud/cloud_cumulative_compaction_policy.cpp @@ -55,6 +55,21 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets( const int64_t max_compaction_score, const int64_t min_compaction_score, std::vector* input_rowsets, Version* last_delete_version, size_t* compaction_score, bool allow_delete) { + DBUG_EXECUTE_IF("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets", { + auto target_tablet_id = dp->param("tablet_id", -1); + if (target_tablet_id == tablet->tablet_id()) { + auto start_version = dp->param("start_version", -1); + auto end_version = dp->param("end_version", -1); + for (auto& rowset : candidate_rowsets) { + if (rowset->start_version() >= start_version && + rowset->end_version() <= end_version) { + input_rowsets->push_back(rowset); + } + } + } + return input_rowsets->size(); + }) + size_t promotion_size = cloud_promotion_size(tablet); auto max_version = tablet->max_version().first; int transient_size = 0; diff --git a/be/src/cloud/cloud_full_compaction.cpp b/be/src/cloud/cloud_full_compaction.cpp index bce00c9a2e74f6..f983e57ebe082b 100644 --- a/be/src/cloud/cloud_full_compaction.cpp +++ b/be/src/cloud/cloud_full_compaction.cpp @@ -30,6 +30,7 @@ #include "olap/rowset/beta_rowset.h" #include "olap/tablet_meta.h" #include "service/backend_options.h" +#include "util/debug_points.h" #include "util/thread.h" #include "util/uuid_generator.h" #include "vec/columns/column.h" @@ -221,6 +222,8 @@ Status CloudFullCompaction::modify_rowsets() { compaction_job->set_index_size_output_rowsets(_output_rowset->index_disk_size()); compaction_job->set_segment_size_output_rowsets(_output_rowset->data_disk_size()); + DBUG_EXECUTE_IF("CloudFullCompaction::modify_rowsets.block", DBUG_BLOCK); + DeleteBitmapPtr output_rowset_delete_bitmap = nullptr; if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && _tablet->enable_unique_key_merge_on_write()) { diff --git a/regression-test/data/fault_injection_p0/cloud/test_cloud_full_compaction_do_lease.out b/regression-test/data/fault_injection_p0/cloud/test_cloud_full_compaction_do_lease.out new file mode 100644 index 00000000000000..6e498c12c60abf --- /dev/null +++ b/regression-test/data/fault_injection_p0/cloud/test_cloud_full_compaction_do_lease.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +10 + diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 3d616654057efe..660b1fff408760 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -1781,6 +1781,24 @@ class Suite implements GroovyInterceptable { } } + void setBeConfigTemporary(Map tempConfig, Closure actionSupplier) { + Map> originConf = Maps.newHashMap() + tempConfig.each{ k, v -> + originConf.put(k, get_be_param(k)) + } + try { + tempConfig.each{ k, v -> set_be_param(k, v)} + actionSupplier() + } catch (Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + originConf.each { k, confs -> + set_original_be_param(k, confs) + } + } + } + void waitAddFeFinished(String host, int port) { logger.info("waiting for ${host}:${port}") Awaitility.await().atMost(60, TimeUnit.SECONDS).with().pollDelay(100, TimeUnit.MILLISECONDS).and() diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_full_compaction_do_lease.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_full_compaction_do_lease.groovy new file mode 100644 index 00000000000000..685af405c9da08 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_full_compaction_do_lease.groovy @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.junit.Assert + +suite("test_cloud_full_compaction_do_lease","nonConcurrent") { + if (!isCloudMode()) { + return + } + + def tableName = "test_cloud_full_compaction_do_lease" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ CREATE TABLE ${tableName} + (k int, v1 int, v2 int ) + UNIQUE KEY(k) + DISTRIBUTED BY HASH (k) + BUCKETS 1 PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write"="true", + "disable_auto_compaction" = "true"); + """ + + (1..20).each{ id -> + sql """insert into ${tableName} select number, number, number from numbers("number"="10");""" + } + + qt_sql "select count(1) from ${tableName};" + + def backends = sql_return_maparray('show backends') + def tabletStats = sql_return_maparray("show tablets from ${tableName};") + assert tabletStats.size() == 1 + def tabletId = tabletStats[0].TabletId + def tabletBackendId = tabletStats[0].BackendId + def tabletBackend + for (def be : backends) { + if (be.BackendId == tabletBackendId) { + tabletBackend = be + break; + } + } + logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}"); + + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + def customBeConfig = [ + lease_compaction_interval_seconds : 2 + ] + + setBeConfigTemporary(customBeConfig) { + try { + // block the full compaction + GetDebugPoint().enableDebugPointForAllBEs("CloudFullCompaction::modify_rowsets.block") + + GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets", + [tablet_id:"${tabletId}", start_version:"2", end_version:"10"]); + + { + // trigger full compaction, it will be blokced in modify_rowsets + logger.info("trigger full compaction on BE ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}") + def (code, out, err) = be_run_full_compaction(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assert code == 0 + def compactJson = parseJson(out.trim()) + assert "success" == compactJson.status.toLowerCase() + } + + // wait until the full compaction job's lease timeout(lease_compaction_interval_seconds * 4) + Thread.sleep(10000); + + { + // trigger cumu compaction, it will be blokced in modify_rowsets + logger.info("trigger cumu compaction on BE ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}") + def (code, out, err) = be_run_cumulative_compaction(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assert code == 0 + def compactJson = parseJson(out.trim()) + assert "success" == compactJson.status.toLowerCase() + } + + Thread.sleep(1000); + + // unblock full compaction + GetDebugPoint().disableDebugPointForAllBEs("CloudFullCompaction::modify_rowsets.block") + + Thread.sleep(1000); + + { + def (code, out, err) = be_show_tablet_status(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + assert code == 0 + def compactJson = parseJson(out.trim()) + assert compactJson["rowsets"].toString().contains("[2-10]") + } + + + } catch (Exception e) { + logger.info(e.getMessage()) + assert false + } finally { + GetDebugPoint().disableDebugPointForAllBEs("CloudFullCompaction::modify_rowsets.block") + GetDebugPoint().disableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets") + } + } +} From 52e0f38398fecd5145130adcc9fc278ff07bc339 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Fri, 24 Jan 2025 20:27:05 +0800 Subject: [PATCH 2/2] fix and fix case --- .../cloud_cumulative_compaction_policy.cpp | 27 ++++++++++--------- be/src/cloud/cloud_storage_engine.cpp | 9 +++++++ ...test_cloud_full_compaction_do_lease.groovy | 13 ++++++--- 3 files changed, 32 insertions(+), 17 deletions(-) diff --git a/be/src/cloud/cloud_cumulative_compaction_policy.cpp b/be/src/cloud/cloud_cumulative_compaction_policy.cpp index cc5037686606a0..100556de97078c 100644 --- a/be/src/cloud/cloud_cumulative_compaction_policy.cpp +++ b/be/src/cloud/cloud_cumulative_compaction_policy.cpp @@ -55,20 +55,21 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets( const int64_t max_compaction_score, const int64_t min_compaction_score, std::vector* input_rowsets, Version* last_delete_version, size_t* compaction_score, bool allow_delete) { - DBUG_EXECUTE_IF("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets", { - auto target_tablet_id = dp->param("tablet_id", -1); - if (target_tablet_id == tablet->tablet_id()) { - auto start_version = dp->param("start_version", -1); - auto end_version = dp->param("end_version", -1); - for (auto& rowset : candidate_rowsets) { - if (rowset->start_version() >= start_version && - rowset->end_version() <= end_version) { - input_rowsets->push_back(rowset); + DBUG_EXECUTE_IF( + "CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets", { + auto target_tablet_id = dp->param("tablet_id", -1); + if (target_tablet_id == tablet->tablet_id()) { + auto start_version = dp->param("start_version", -1); + auto end_version = dp->param("end_version", -1); + for (auto& rowset : candidate_rowsets) { + if (rowset->start_version() >= start_version && + rowset->end_version() <= end_version) { + input_rowsets->push_back(rowset); + } + } } - } - } - return input_rowsets->size(); - }) + return input_rowsets->size(); + }) size_t promotion_size = cloud_promotion_size(tablet); auto max_version = tablet->max_version().first; diff --git a/be/src/cloud/cloud_storage_engine.cpp b/be/src/cloud/cloud_storage_engine.cpp index 766f83563f729a..f71277d5983525 100644 --- a/be/src/cloud/cloud_storage_engine.cpp +++ b/be/src/cloud/cloud_storage_engine.cpp @@ -762,6 +762,7 @@ Status CloudStorageEngine::submit_compaction_task(const CloudTabletSPtr& tablet, void CloudStorageEngine::_lease_compaction_thread_callback() { while (!_stop_background_threads_latch.wait_for( std::chrono::seconds(config::lease_compaction_interval_seconds))) { + std::vector> full_compactions; std::vector> base_compactions; std::vector> cumu_compactions; { @@ -776,8 +777,16 @@ void CloudStorageEngine::_lease_compaction_thread_callback() { cumu_compactions.push_back(cumu); } } + for (auto& [_, full] : _submitted_full_compactions) { + if (full) { + full_compactions.push_back(full); + } + } } // TODO(plat1ko): Support batch lease rpc + for (auto& comp : full_compactions) { + comp->do_lease(); + } for (auto& comp : cumu_compactions) { comp->do_lease(); } diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_full_compaction_do_lease.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_full_compaction_do_lease.groovy index 685af405c9da08..1910788d92f081 100644 --- a/regression-test/suites/fault_injection_p0/cloud/test_cloud_full_compaction_do_lease.groovy +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_full_compaction_do_lease.groovy @@ -62,6 +62,10 @@ suite("test_cloud_full_compaction_do_lease","nonConcurrent") { ] setBeConfigTemporary(customBeConfig) { + // the default value of lease_compaction_interval_seconds is 20s, which means + // the compaction lease thread will sleep for 20s first, we sleep 20s in case + // so that compaction lease thread can be scheduled as we expect(2s) + Thread.sleep(20000) try { // block the full compaction GetDebugPoint().enableDebugPointForAllBEs("CloudFullCompaction::modify_rowsets.block") @@ -83,13 +87,14 @@ suite("test_cloud_full_compaction_do_lease","nonConcurrent") { Thread.sleep(10000); { - // trigger cumu compaction, it will be blokced in modify_rowsets + // trigger cumu compaction logger.info("trigger cumu compaction on BE ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}") def (code, out, err) = be_run_cumulative_compaction(tabletBackend.Host, tabletBackend.HttpPort, tabletId) logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) assert code == 0 def compactJson = parseJson(out.trim()) - assert "success" == compactJson.status.toLowerCase() + // this will fail due to existing full compaction + assert "e-2000" == compactJson.status.toLowerCase() } Thread.sleep(1000); @@ -97,13 +102,13 @@ suite("test_cloud_full_compaction_do_lease","nonConcurrent") { // unblock full compaction GetDebugPoint().disableDebugPointForAllBEs("CloudFullCompaction::modify_rowsets.block") - Thread.sleep(1000); + Thread.sleep(3000); { def (code, out, err) = be_show_tablet_status(tabletBackend.Host, tabletBackend.HttpPort, tabletId) assert code == 0 def compactJson = parseJson(out.trim()) - assert compactJson["rowsets"].toString().contains("[2-10]") + assert compactJson["rowsets"].toString().contains("[2-21]") }