From 1a33fdefd8868cc864e7a27135f8d32bb312e947 Mon Sep 17 00:00:00 2001 From: Jan Heinrich Reimer Date: Tue, 14 Nov 2023 17:11:37 +0100 Subject: [PATCH] Add SERP WARC download CLI --- archive_query_log/cli/serps.py | 120 ++++++++++++++++++++++++++++++++- 1 file changed, 117 insertions(+), 3 deletions(-) diff --git a/archive_query_log/cli/serps.py b/archive_query_log/cli/serps.py index 05228981..a6f566c3 100644 --- a/archive_query_log/cli/serps.py +++ b/archive_query_log/cli/serps.py @@ -1,10 +1,9 @@ from datetime import datetime from itertools import chain -from typing import Iterable, Iterator, Any, Final -from warnings import warn +from typing import Iterable, Iterator, Final +from uuid import uuid5 from click import group, echo -from elasticsearch import ConnectionTimeout from elasticsearch_dsl import Search from elasticsearch_dsl.function import RandomScore from elasticsearch_dsl.query import Exists, FunctionScore, Script, Term @@ -13,8 +12,10 @@ from warcio.recordloader import ArcWarcRecord from web_archive_api.memento import MementoApi +from archive_query_log import __version__ as app_version from archive_query_log.cli.util import pass_config from archive_query_log.config import Config +from archive_query_log.namespaces import NAMESPACE_WARC_DOWNLOADER from archive_query_log.orm import Capture, Serp, InnerCapture, InnerParser, \ UrlQueryParser, Provider, InnerDownloader, WarcLocation from archive_query_log.parse.url_query import parse_url_query as \ @@ -156,3 +157,116 @@ def parse_url_query(config: Config) -> None: else: echo("No new/changed captures.") + +@serps.group() +def download(): + pass + + +class _SerpArcWarcRecord(ArcWarcRecord): + serp: Final[Serp] + + def __init__(self, serp: Serp, *args, **kwargs): + super().__init__(*args, **kwargs) + self.serp = serp + + +def _download_warc(config: Config, serp: Serp) -> Iterator[_SerpArcWarcRecord]: + memento_api = MementoApi( + api_url=serp.archive.memento_api_url, + session=config.http.session, + ) + records = memento_api.load_url_warc( + url=serp.capture.url, + timestamp=serp.capture.timestamp, + raw=True, + ) + serp_records = ( + _SerpArcWarcRecord(serp, record) + for record in records + ) + yield from serp_records + + +def _stored_serp(warc_record: WarcS3Record) -> tuple[Serp, WarcLocation]: + record: ArcWarcRecord = warc_record.record + if not isinstance(record, _SerpArcWarcRecord): + raise TypeError(f"Expected _SerpArcWarcRecord, got {type(record)}.") + + location = WarcLocation( + file=warc_record.location.key, + offset=warc_record.location.offset, + length=warc_record.location.length, + ) + return record.serp, location + + +@download.command(help="Download archived documents of captures as WARC.") +@pass_config +def warc(config: Config) -> None: + start_time = utc_now() + downloader_id_components = ( + config.s3.endpoint_url, + config.s3.bucket_name, + app_version, + ) + downloader_id = str(uuid5( + NAMESPACE_WARC_DOWNLOADER, + ":".join(downloader_id_components), + )) + downloader = InnerDownloader( + id=downloader_id, + last_downloaded=start_time, + ) + + changed_serps_search: Search = ( + Serp.search(using=config.es.client) + .filter( + ~Exists(field="last_modified") | + ~Exists(field="warc_downloader.last_downloaded") | + Script( + script="!doc['last_modified'].isEmpty() && " + "!doc['warc_downloader.last_downloaded']" + ".isEmpty() && " + "!doc['last_modified'].value.isBefore(" + "doc['warc_downloader.last_downloaded'].value" + ")", + ) + ) + .query(FunctionScore(functions=[RandomScore()])) + ) + num_changed_serps = ( + changed_serps_search.extra(track_total_hits=True) + .execute().hits.total.value) + + if num_changed_serps <= 0: + echo("No new/changed captures.") + return + + changed_serps: Iterable[Serp] = ( + changed_serps_search.params(preserve_order=True).scan()) + changed_serps = safe_iter_scan(changed_serps) + # noinspection PyTypeChecker + changed_serps = tqdm(changed_serps, total=num_changed_serps, + desc="Downloading WARCs", unit="SERP") + + # Download from Memento API. + serp_records = chain.from_iterable( + _download_warc(config, serp) + for serp in changed_serps + ) + + # Write to S3. + stored_records: Iterator[WarcS3Record] = ( + config.s3.warc_store.write(serp_records)) + stored_serps = (_stored_serp(record) for record in stored_records) + + for serp, location in stored_serps: + print(serp, location) + serp.update( + using=config.es.client, + retry_on_conflict=3, + warc_location=location, + warc_downloader=downloader, + ) + Serp.index().refresh(using=config.es.client)