Skip to content

Commit

Permalink
Add more sync records + fix small bug in monitoring task causing dele…
Browse files Browse the repository at this point in the history
…tion metrics to never be emitted (#3837)

Double check we don't double-emit + fix pruning metric

Add log

Fix comment

rename
  • Loading branch information
Weves authored Jan 30, 2025
1 parent 2410525 commit 95701db
Show file tree
Hide file tree
Showing 7 changed files with 275 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from celery.exceptions import SoftTimeLimitExceeded
from redis import Redis
from redis.lock import Lock as RedisLock
from sqlalchemy.orm import Session

from ee.onyx.db.connector_credential_pair import get_all_auto_sync_cc_pairs
from ee.onyx.db.document import upsert_document_external_perms
Expand All @@ -31,12 +32,17 @@
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisLocks
from onyx.db.connector import mark_cc_pair_as_permissions_synced
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
from onyx.db.document import upsert_document_by_connector_credential_pair
from onyx.db.engine import get_session_with_tenant
from onyx.db.enums import AccessType
from onyx.db.enums import ConnectorCredentialPairStatus
from onyx.db.enums import SyncStatus
from onyx.db.enums import SyncType
from onyx.db.models import ConnectorCredentialPair
from onyx.db.sync_record import insert_sync_record
from onyx.db.sync_record import update_sync_record_status
from onyx.db.users import batch_add_ext_perm_user_if_not_exists
from onyx.redis.redis_connector import RedisConnector
from onyx.redis.redis_connector_doc_perm_sync import (
Expand All @@ -57,6 +63,9 @@
LIGHT_TIME_LIMIT = LIGHT_SOFT_TIME_LIMIT + 15


"""Jobs / utils for kicking off doc permissions sync tasks."""


def _is_external_doc_permissions_sync_due(cc_pair: ConnectorCredentialPair) -> bool:
"""Returns boolean indicating if external doc permissions sync is due."""

Expand Down Expand Up @@ -174,6 +183,15 @@ def try_creating_permissions_sync_task(

custom_task_id = f"{redis_connector.permissions.generator_task_key}_{uuid4()}"

# create before setting fence to avoid race condition where the monitoring
# task updates the sync record before it is created
with get_session_with_tenant(tenant_id) as db_session:
insert_sync_record(
db_session=db_session,
entity_id=cc_pair_id,
sync_type=SyncType.EXTERNAL_PERMISSIONS,
)

# set a basic fence to start
payload = RedisConnectorPermissionSyncPayload(started=None, celery_task_id=None)
redis_connector.permissions.set_fence(payload)
Expand Down Expand Up @@ -400,3 +418,53 @@ def update_external_document_permissions_task(
f"Error Syncing Document Permissions: connector_id={connector_id} doc_id={doc_id}"
)
return False


"""Monitoring CCPair permissions utils, called in monitor_vespa_sync"""


def monitor_ccpair_permissions_taskset(
tenant_id: str | None, key_bytes: bytes, r: Redis, db_session: Session
) -> None:
fence_key = key_bytes.decode("utf-8")
cc_pair_id_str = RedisConnector.get_id_from_fence_key(fence_key)
if cc_pair_id_str is None:
task_logger.warning(
f"monitor_ccpair_permissions_taskset: could not parse cc_pair_id from {fence_key}"
)
return

cc_pair_id = int(cc_pair_id_str)

redis_connector = RedisConnector(tenant_id, cc_pair_id)
if not redis_connector.permissions.fenced:
return

initial = redis_connector.permissions.generator_complete
if initial is None:
return

remaining = redis_connector.permissions.get_remaining()
task_logger.info(
f"Permissions sync progress: cc_pair={cc_pair_id} remaining={remaining} initial={initial}"
)
if remaining > 0:
return

payload: RedisConnectorPermissionSyncPayload | None = (
redis_connector.permissions.payload
)
start_time: datetime | None = payload.started if payload else None

mark_cc_pair_as_permissions_synced(db_session, int(cc_pair_id), start_time)
task_logger.info(f"Successfully synced permissions for cc_pair={cc_pair_id}")

update_sync_record_status(
db_session=db_session,
entity_id=cc_pair_id,
sync_type=SyncType.EXTERNAL_PERMISSIONS,
sync_status=SyncStatus.SUCCESS,
num_docs_synced=initial,
)

redis_connector.permissions.reset()
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@
from onyx.db.engine import get_session_with_tenant
from onyx.db.enums import AccessType
from onyx.db.enums import ConnectorCredentialPairStatus
from onyx.db.enums import SyncStatus
from onyx.db.enums import SyncType
from onyx.db.models import ConnectorCredentialPair
from onyx.db.sync_record import insert_sync_record
from onyx.db.sync_record import update_sync_record_status
from onyx.redis.redis_connector import RedisConnector
from onyx.redis.redis_connector_ext_group_sync import (
RedisConnectorExternalGroupSyncPayload,
Expand Down Expand Up @@ -200,6 +204,15 @@ def try_creating_external_group_sync_task(
celery_task_id=result.id,
)

# create before setting fence to avoid race condition where the monitoring
# task updates the sync record before it is created
with get_session_with_tenant(tenant_id) as db_session:
insert_sync_record(
db_session=db_session,
entity_id=cc_pair_id,
sync_type=SyncType.EXTERNAL_GROUP,
)

redis_connector.external_group_sync.set_fence(payload)

except Exception:
Expand Down Expand Up @@ -289,11 +302,26 @@ def connector_external_group_sync_generator_task(
)

mark_cc_pair_as_external_group_synced(db_session, cc_pair.id)

update_sync_record_status(
db_session=db_session,
entity_id=cc_pair_id,
sync_type=SyncType.EXTERNAL_GROUP,
sync_status=SyncStatus.SUCCESS,
)
except Exception as e:
task_logger.exception(
f"Failed to run external group sync: cc_pair={cc_pair_id}"
)

with get_session_with_tenant(tenant_id) as db_session:
update_sync_record_status(
db_session=db_session,
entity_id=cc_pair_id,
sync_type=SyncType.EXTERNAL_GROUP,
sync_status=SyncStatus.FAILED,
)

redis_connector.external_group_sync.generator_clear()
redis_connector.external_group_sync.taskset_clear()
raise e
Expand Down
129 changes: 106 additions & 23 deletions backend/onyx/background/celery/tasks/monitoring/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@
"sync_start_latency:{sync_type}:{entity_id}:{sync_record_id}"
)

_CONNECTOR_START_TIME_KEY_FMT = "connector_start_time:{cc_pair_id}:{index_attempt_id}"
_CONNECTOR_END_TIME_KEY_FMT = "connector_end_time:{cc_pair_id}:{index_attempt_id}"
_SYNC_START_TIME_KEY_FMT = "sync_start_time:{sync_type}:{entity_id}:{sync_record_id}"
_SYNC_END_TIME_KEY_FMT = "sync_end_time:{sync_type}:{entity_id}:{sync_record_id}"


def _mark_metric_as_emitted(redis_std: Redis, key: str) -> None:
"""Mark a metric as having been emitted by setting a Redis key with expiration"""
Expand Down Expand Up @@ -303,8 +308,6 @@ def _build_connector_final_metrics(
)
)

_mark_metric_as_emitted(redis_std, metric_key)

return metrics


Expand Down Expand Up @@ -344,6 +347,52 @@ def _collect_connector_metrics(db_session: Session, redis_std: Redis) -> list[Me
if one_hour_ago > most_recent_attempt.time_created:
continue

# Build a job_id for correlation
job_id = build_job_id(
"connector", str(cc_pair.id), str(most_recent_attempt.id)
)

# Add raw start time metric if available
if most_recent_attempt.time_started:
start_time_key = _CONNECTOR_START_TIME_KEY_FMT.format(
cc_pair_id=cc_pair.id,
index_attempt_id=most_recent_attempt.id,
)
metrics.append(
Metric(
key=start_time_key,
name="connector_start_time",
value=most_recent_attempt.time_started.timestamp(),
tags={
"job_id": job_id,
"connector_id": str(cc_pair.connector.id),
"source": str(cc_pair.connector.source),
},
)
)

# Add raw end time metric if available and in terminal state
if (
most_recent_attempt.status.is_terminal()
and most_recent_attempt.time_updated
):
end_time_key = _CONNECTOR_END_TIME_KEY_FMT.format(
cc_pair_id=cc_pair.id,
index_attempt_id=most_recent_attempt.id,
)
metrics.append(
Metric(
key=end_time_key,
name="connector_end_time",
value=most_recent_attempt.time_updated.timestamp(),
tags={
"job_id": job_id,
"connector_id": str(cc_pair.connector.id),
"source": str(cc_pair.connector.source),
},
)
)

# Connector start latency
start_latency_metric = _build_connector_start_latency_metric(
cc_pair, most_recent_attempt, second_most_recent_attempt, redis_std
Expand All @@ -365,9 +414,10 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]
"""
Collect metrics for document set and group syncing:
- Success/failure status
- Start latency (always)
- Start latency (for doc sets / user groups)
- Duration & doc count (only if success)
- Throughput (docs/min) (only if success)
- Raw start/end times for each sync
"""
one_hour_ago = get_db_current_time(db_session) - timedelta(hours=1)

Expand All @@ -389,6 +439,43 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]
# Build a job_id for correlation
job_id = build_job_id("sync_record", str(sync_record.id))

# Add raw start time metric
start_time_key = _SYNC_START_TIME_KEY_FMT.format(
sync_type=sync_record.sync_type,
entity_id=sync_record.entity_id,
sync_record_id=sync_record.id,
)
metrics.append(
Metric(
key=start_time_key,
name="sync_start_time",
value=sync_record.sync_start_time.timestamp(),
tags={
"job_id": job_id,
"sync_type": str(sync_record.sync_type),
},
)
)

# Add raw end time metric if available
if sync_record.sync_end_time:
end_time_key = _SYNC_END_TIME_KEY_FMT.format(
sync_type=sync_record.sync_type,
entity_id=sync_record.entity_id,
sync_record_id=sync_record.id,
)
metrics.append(
Metric(
key=end_time_key,
name="sync_end_time",
value=sync_record.sync_end_time.timestamp(),
tags={
"job_id": job_id,
"sync_type": str(sync_record.sync_type),
},
)
)

# Emit a SUCCESS/FAIL boolean metric
# Use a single Redis key to avoid re-emitting final metrics
final_metric_key = _FINAL_METRIC_KEY_FMT.format(
Expand Down Expand Up @@ -439,7 +526,7 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]
if duration_seconds is not None:
metrics.append(
Metric(
key=None,
key=final_metric_key,
name="sync_duration_seconds",
value=duration_seconds,
tags={
Expand All @@ -455,7 +542,7 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]

metrics.append(
Metric(
key=None,
key=final_metric_key,
name="sync_doc_count",
value=doc_count,
tags={
Expand All @@ -468,7 +555,7 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]
if sync_speed is not None:
metrics.append(
Metric(
key=None,
key=final_metric_key,
name="sync_speed_docs_per_min",
value=sync_speed,
tags={
Expand All @@ -482,9 +569,6 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]
f"Invalid sync record {sync_record.id} with no duration"
)

# Mark final metrics as emitted so we don't re-emit
_mark_metric_as_emitted(redis_std, final_metric_key)

# Emit start latency
start_latency_key = _SYNC_START_LATENCY_KEY_FMT.format(
sync_type=sync_record.sync_type,
Expand All @@ -502,22 +586,20 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]
entity = db_session.scalar(
select(UserGroup).where(UserGroup.id == sync_record.entity_id)
)
else:
task_logger.info(
f"Skipping sync record {sync_record.id} of type {sync_record.sync_type}."
)
continue

if entity is None:
task_logger.error(
f"Could not find entity for sync record {sync_record.id} "
f"(type={sync_record.sync_type}, id={sync_record.entity_id})."
f"Sync record of type {sync_record.sync_type} doesn't have an entity "
f"associated with it (id={sync_record.entity_id}). Skipping start latency metric."
)
continue

# Calculate start latency in seconds:
# (actual sync start) - (last modified time)
if entity.time_last_modified_by_user and sync_record.sync_start_time:
if (
entity is not None
and entity.time_last_modified_by_user
and sync_record.sync_start_time
):
start_latency = (
sync_record.sync_start_time - entity.time_last_modified_by_user
).total_seconds()
Expand All @@ -541,8 +623,6 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]
)
)

_mark_metric_as_emitted(redis_std, start_latency_key)

return metrics


Expand Down Expand Up @@ -607,9 +687,12 @@ def monitor_background_processes(self: Task, *, tenant_id: str | None) -> None:
for metric_fn in metric_functions:
metrics = metric_fn()
for metric in metrics:
metric.log()
metric.emit(tenant_id)
if metric.key:
# double check to make sure we aren't double-emitting metrics
if metric.key is not None and not _has_metric_been_emitted(
redis_std, metric.key
):
metric.log()
metric.emit(tenant_id)
_mark_metric_as_emitted(redis_std, metric.key)

task_logger.info("Successfully collected background metrics")
Expand Down
Loading

0 comments on commit 95701db

Please sign in to comment.