Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

samples limit is not honored by failed rows checks #1985

Closed
denisdallinga opened this issue Jan 9, 2024 · 5 comments
Closed

samples limit is not honored by failed rows checks #1985

denisdallinga opened this issue Jan 9, 2024 · 5 comments

Comments

@denisdallinga
Copy link

denisdallinga commented Jan 9, 2024

We discovered an issue with sample limits not being applied when using failed row checks.

checks for my_db.my_table [daily]:
    - failed rows:
        samples limit: 5
        # Any row that matches this dataset filter fails
        fail condition: 1 = 1
        name: test check

Some things we've tried with a similar configuration as above:

  • Even though the samples limit is explicitly defined in the check, the sampler collects the complete dataset, not applying the limit of 5 as specified in the definition.
  • The bug is apparent when using both the fail condition and fail query .
  • If the samples limit is left out of the definition, still the whole dataset is collected, and not the default of 100 rows as mentioned in the documentation.
  • Other types of checks do correctly apply the samples limit, we've verified this with failed_count and missing_count checks. The LIMIT can be seen being applied when inspecting the SQL queries generated by Soda.

Confirmed not working on Soda Core 3.1.2

Mention in soda-core channel in the soda community slack

To reproduce the issue

from pyspark.sql import SparkSession
from soda.scan import Scan
from soda.sampler.sampler import Sampler, SampleContext

# custom sampler to print the amount of rows being sampled
class CountRowsSampler(Sampler):
    def store_sample(self, sample_context: SampleContext):
        print(f'sample size: {len(sample_context.sample.get_rows())}')

spark_session = SparkSession.builder.config('spark.app.name', 'pytest-spark').getOrCreate()

# 101 entries, to see that the sampler does not honor the explicit(5) or default limit(100)
data = [['1']] * 101
df = spark_session.createDataFrame(data=data, schema=['number_one'])
df.createOrReplaceTempView('dataframe')

# setup the scan
scan = Scan()
scan.add_spark_session(spark_session=spark_session)
scan.set_data_source_name('spark_df')

check = """
checks for dataframe:
    - failed rows:
        samples limit: 5
        # Any row that matches this dataset filter fails
        fail condition: 1 = 1
"""
scan.add_sodacl_yaml_str(check)
scan.sampler = CountRowsSampler()

scan.execute()

prints:

sample size: 101
@tools-soda
Copy link

SAS-2664

denisdallinga added a commit to denisdallinga/soda-core that referenced this issue Jan 11, 2024
This changes the way samples are collected from
UserDefinedFailedRowsExpressionQueries. In order to apply the samples
limit given in the check configuration, or apply the default samples
limit, the failed rows expression needs to fire off an additional
SampleQuery. The SampleQuery executes a copy of the query, appending a
LIMIT clause.

[sodadata#1985](sodadata#1985)
@denisdallinga
Copy link
Author

I made a first attempt at a partial fix for this: #1986

This works for the failed row checks that use fail condition. For those checks it was possible to decouple the sampling from executing the check, by making allow_samples an adjustable argument on Query.store. The UserDefinedFailedRowsExpressionQuery executes an additional SampleQuery after calling its parent's store method to collect and store the samples.

However this approach doesn't work for the failed row checks that use fail query. The coupling between sampling and evaluating the result of the check was heavier and I couldn't figure out how to do it without having to refactor larger parts of the code that are used for this check. The calculation of the failed_row_count metrics for these types of check rely on collecting the whole sample and calculating the size of the returned dataset. Applying a limit implicitely would mean this metric would be off for any checks that return a row count higher than the sample limit.

Can I get some thoughts from the Soda Team on:

  • my approach for the failed row checks that use fail condition. Does it make sense? I don't fully grasp the Soda codebase and architecture yet, but I thought this made sense. I saw the same strategy is applied by the DuplicateQuery and the ReferenceQuery
  • Do you see any easy way for fixing it for the failed row checks that use fail query? Or is my assesment correct that it most likely needs a bigger refactoring effort to make it happen?

After your initial replies I'll have a look at adding unit tests to the PR

@denisdallinga
Copy link
Author

When going over my first solution with a colleague, I realised this is not going to solve the issue for us because of the whole dataframe still being collected for failed row checks that use fail condition on this line to compute the number of failed rows: https://github.com/sodadata/soda-core/blob/main/soda/core/soda/execution/query/query.py#L204

Calculating the number of failed rows by retrieving the complete dataset and computing the length of it, has scaling limitations, which we encounter pretty frequently in our environment. Going around that needs refactoring on a deeper level than what my solution provides so far. When I have time, I'll try to figure out what is needed to solve the root cause of this scaling limitation

@nbixler
Copy link

nbixler commented Jan 16, 2024

Could we add get_row_length functionality in the class DbSample(Sample) like so?

    def get_row_length(self) -> int:
        if not self.row_length:
            try:
                client = bigquery.Client()

                result = client.query(f"SELECT COUNT(*) FROM `{self.data_source}`").result()

                self.row_length = next(iter(result)).values()[0]
            except:
                self.row_length = len(self.cursor.fetchall())

        return self.row_length

@m1n0
Copy link
Contributor

m1n0 commented Jun 17, 2024

This was addressed in 3.3.5, you can now have user defined metric check with a failed rows query in one check, making the failed rows sample possible to be limited.
We do not want to do any post-fact limit (i.e. use the sample limit after collecting data) as that would hinder the performance, memory consumption and act inconsistently with other checks where all limits are applied to the sample queries.

@m1n0 m1n0 closed this as completed Jun 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants