-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add remote table materialization #8
base: dap-main
Are you sure you want to change the base?
Conversation
@@ -117,8 +117,7 @@ def convert_time_type(cls, agate_table: "agate.Table", col_idx: int) -> str: | |||
@available.parse(lambda *a, **k: {}) | |||
def get_clickhouse_cluster_name(self): | |||
conn = self.connections.get_if_exists() | |||
if conn.credentials.cluster: | |||
return f'"{conn.credentials.cluster}"' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -32,7 +32,7 @@ | |||
db.engine as db_engine, | |||
{%- if adapter.get_clickhouse_cluster_name() -%} | |||
count(distinct _shard_num) > 1 as is_on_cluster | |||
from clusterAllReplicas({{ adapter.get_clickhouse_cluster_name() }}, system.tables) as t | |||
from clusterAllReplicas('{{ adapter.get_clickhouse_cluster_name() }}', system.tables) as t |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All three variants work
select * from cluster(default, system.one);
select * from cluster('default', system.one);
select * from cluster("default", system.one);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was worried about passing reserved names to the first parameter
|
||
# this `remote_table` run with default configuration will point to previously created local table, taking | ||
# `local_db_prefix` and `local_suffix` settings into account. | ||
run_dbt() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
create or replace table `dbt_clickhouse_3961_test_remote_table_1736978435032`.`remote_table`
(key1 UInt64, key2 Int64)
engine = Distributed('test_shard', 'dbt_clickhouse_3961_test_remote_table_1736978435032', 'remote_table_local'
, rand()
)
I copied this DDL from test debugging, and I have two questions.
- What is the use case for creating a Distributed table on
test_shard
when the base table is also ontest_shard
? Wouldn't this simply overwrite the Distributed table created bydistributed_incremental
materialization? - I didn't dig deeper, but
on_cluster_clause
returned nothing in this case, but I believe this query should have anon cluster test_shard
clause, otherwise the Distributed table will only be available on one shard.
{%- set remote_config = config.get('remote_config', {}) -%} | ||
{%- set remote_cluster = remote_config.get('cluster') or adapter.get_clickhouse_cluster_name() -%} | ||
{%- set remote_schema = remote_config.get('schema') or adapter.get_clickhouse_local_db_prefix() + this.schema -%} | ||
{%- set remote_identifier = remote_config.get('identifier') or this.identifier + adapter.get_clickhouse_local_suffix() -%} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will remote_config.get('schema')
be with leading underscore and remote_config.get('identifier')
with _base
suffix in case the base table in on the ingestion cluster?
""") | ||
|
||
# the created distributed table should point to a local table as defined in the model's `remote_config` | ||
run_dbt() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
create or replace table `dbt_clickhouse_3076_test_remote_table_1736979527642`.`remote_table`
(key1 UInt64, key2 Int64)
engine = Distributed('test_remote', 'dbt_clickhouse_3076_test_remote_table_1736979527642', 'remote_target_table', key1)
Is on cluster test_shard
also missing here?
{{ run_hooks(pre_hooks, inside_transaction=True) }} | ||
|
||
{% call statement('main') %} | ||
{{ create_distributed_table(target_relation, remote_relation, sql) }} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where is the sql defined?
{%- set remote_cluster = local_relation.remote_cluster or adapter.get_clickhouse_cluster_name() -%} | ||
{%- set sharding = config.get('sharding_key') -%} | ||
{%- set reference = "as " ~ local_relation -%} | ||
{%- if sql -%} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the use case for having an SQL when creating a distributed table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you don't rely on an existing local relation for the ddl query. the sql will allow to directly pass the column names and types
Materialization strategy for creating distributed engine table with arbitrary base table target