Skip to content

Commit

Permalink
Fix release colors update
Browse files Browse the repository at this point in the history
The mapping database in the config is an outdated replica of MB db that hasn't been
updated in 6 months probably because the dumps DB was moved to a new node, but we
still have the old coordinates hardcoded. Change it to use MB standby instead.

The process_row function had a bug in downloading and processing the images.

Mostly stylistic changes other than that.
  • Loading branch information
amCap1712 committed May 10, 2024
1 parent f303a82 commit 0b0e0fd
Showing 1 changed file with 112 additions and 117 deletions.
229 changes: 112 additions & 117 deletions mbid_mapping/mapping/release_colors.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import datetime
import os
import re
import subprocess
from time import sleep
from threading import Thread, get_ident

import psycopg2
from psycopg2.errors import OperationalError
from psycopg2.extensions import register_adapter
import requests

Expand All @@ -15,7 +12,6 @@
from mapping.cube import Cube, adapt_cube
from mapping.utils import log


register_adapter(Cube, adapt_cube)

# max number of threads to use -- with 2 we don't need to worry about rate limiting.
Expand All @@ -33,8 +29,6 @@ def process_image(filename, mime_type):
and return the (reg, green, blue) tuple """

with open(filename, "rb") as raw:
proc = subprocess.Popen(["file", filename], stdout=subprocess.PIPE)
tmp = proc.communicate(raw.read())
proc = subprocess.Popen(["jpegtopnm", filename],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
tmp = proc.communicate(raw.read())
Expand All @@ -45,10 +39,10 @@ def process_image(filename, mime_type):

lines = out[0].split(b"\n", 3)
if lines[0].startswith(b"P6"): # PPM
return (lines[3][0], lines[3][1], lines[3][2])
return lines[3][0], lines[3][1], lines[3][2]

if lines[0].startswith(b"P5"): # PGM
return (lines[3][0], lines[3][0], lines[3][0])
return lines[3][0], lines[3][0], lines[3][0]

raise RuntimeError

Expand All @@ -75,10 +69,11 @@ def process_row(row):
""" Process one CAA query row, by fetching the 250px thumbnail,
process the color, then import into the DB """

sleep_duation = 2
sleep_duration = 2
while True:
headers = {
'User-Agent': 'ListenBrainz HueSound Color Bot ( [email protected] )'}
'User-Agent': 'ListenBrainz HueSound Color Bot ( [email protected] )'
}
release_mbid, caa_id = row["release_mbid"], row["caa_id"]
url = f"https://archive.org/download/mbid-{release_mbid}/mbid-{release_mbid}-{caa_id}_thumb250.jpg"
r = requests.get(url, headers=headers)
Expand All @@ -90,16 +85,14 @@ def process_row(row):

try:
red, green, blue = process_image(filename, row["mime_type"])
insert_row(row["release_mbid"], red,
green, blue, row["caa_id"])
insert_row(row["release_mbid"], red, green, blue, row["caa_id"])
log("%s %s: (%s, %s, %s)" %
(row["caa_id"], row["release_mbid"], red, green, blue))
(row["caa_id"], row["release_mbid"], red, green, blue))
except Exception as err:
log("Could not process %s" % url)
log(err)

os.unlink(filename)
sleep_duation = 2
break

if r.status_code == 403:
Expand Down Expand Up @@ -129,13 +122,12 @@ def process_row(row):
break


def delete_from_lb(caa_id):
def delete_from_lb(lb_conn, caa_id):
""" Delete a piece of coverart from the release_color table. """

with psycopg2.connect(config.SQLALCHEMY_DATABASE_URI) as lb_conn:
with lb_conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as lb_curs:
lb_curs.execute(
"""DELETE FROM release_color WHERE caa_id = %s """, (caa_id,))
with lb_conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as lb_curs:
lb_curs.execute("""DELETE FROM release_color WHERE caa_id = %s """, (caa_id,))
lb_conn.commit()


def process_cover_art(threads, row):
Expand Down Expand Up @@ -190,19 +182,19 @@ def get_cover_art_counts(mb_curs, lb_curs):
def get_last_updated_from_caa():
""" Fetch the last_updated (last date_updated) value from the CAA table """

with psycopg2.connect(config.MBID_MAPPING_DATABASE_URI) as mb_conn:
with mb_conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as mb_curs:
mb_curs.execute("""SELECT max(date_uploaded) AS date_uploaded
FROM cover_art_archive.cover_art""")
last_updated = None
row = mb_curs.fetchone()
if row:
try:
last_updated = row["date_uploaded"]
except ValueError:
pass
with psycopg2.connect(config.MB_DATABASE_STANDBY_URI) as mb_conn, \
mb_conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as mb_curs:
mb_curs.execute("""SELECT max(date_uploaded) AS date_uploaded
FROM cover_art_archive.cover_art""")
last_updated = None
row = mb_curs.fetchone()
if row:
try:
last_updated = row["date_uploaded"]
except ValueError:
pass

return last_updated
return last_updated


def sync_release_color_table():
Expand Down Expand Up @@ -286,89 +278,92 @@ def compare_coverart(mb_query, lb_query, mb_caa_index, lb_caa_index, mb_compare_
the corresponding compare key. The starting indexes (the current comparison index
into the data) must be provided and match the type of the comparison keys. """

with psycopg2.connect(config.MBID_MAPPING_DATABASE_URI) as mb_conn:
with mb_conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as mb_curs:
with psycopg2.connect(config.SQLALCHEMY_DATABASE_URI) as lb_conn:
with lb_conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as lb_curs:

mb_count, lb_count = get_cover_art_counts(mb_curs, lb_curs)
log("CAA count: %d\n LB count: %d" % (mb_count, lb_count))

threads = []
mb_row = None
lb_row = None

mb_rows = []
lb_rows = []

mb_done = False
lb_done = True if lb_query is None else False

extra = 0
missing = 0
processed = 0

while True:
if len(mb_rows) == 0 and not mb_done:
mb_curs.execute(
mb_query, (mb_caa_index, SYNC_BATCH_SIZE))
mb_rows = mb_curs.fetchall()
if len(mb_rows) == 0:
mb_done = True

if len(lb_rows) == 0 and not lb_done:
lb_curs.execute(
lb_query, (lb_caa_index, SYNC_BATCH_SIZE))
lb_rows = lb_curs.fetchall()
if len(lb_rows) == 0:
lb_done = True

if not mb_row and len(mb_rows) > 0:
mb_row = mb_rows.pop(0)

if not lb_row and len(lb_rows) > 0:
lb_row = lb_rows.pop(0)

if not lb_row and not mb_row:
break

processed += 1
if processed % 100000 == 0:
log("processed %d of %d: missing %d extra %d" %
(processed, mb_count, missing, extra))

# If the item is in MB, but not in LB, add to LB
if lb_row is None or mb_row[mb_compare_key] < lb_row[lb_compare_key]:
process_cover_art(threads, mb_row)
missing += 1
mb_caa_index = mb_row[mb_compare_key]
mb_row = None
continue

# If the item is in LB, but not in MB, remove from LB
if mb_row is None or mb_row[mb_compare_key] > lb_row[lb_compare_key]:
extra += 1
delete_from_lb(lb_row[lb_compare_key])
lb_caa_index = lb_row[lb_compare_key]
lb_row = None
continue

# If the caa_id is present in both, skip both
if mb_row[mb_compare_key] == lb_row[lb_compare_key]:
mb_caa_index = mb_row[mb_compare_key]
lb_caa_index = lb_row[lb_compare_key]
lb_row = None
mb_row = None
continue

assert False

join_threads(threads)
log( "Finished! added/skipped %d removed %d from release_color" % (missing, extra))

mb_count, lb_count = get_cover_art_counts(mb_curs, lb_curs)
log("CAA count: %d\n LB count: %d" % (mb_count, lb_count))

metrics.init("listenbrainz")
metrics.set("listenbrainz-caa-mapper",
caa_front_count=mb_count, lb_caa_count=lb_count)
with psycopg2.connect(config.MB_DATABASE_STANDBY_URI) as mb_conn, \
mb_conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as mb_curs, \
psycopg2.connect(config.SQLALCHEMY_DATABASE_URI) as lb_conn, \
lb_conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as lb_curs:

mb_count, lb_count = get_cover_art_counts(mb_curs, lb_curs)
log("CAA count: %d" % (mb_count,))
log("LB count: %d" % (lb_count,))

threads = []
mb_row = None
lb_row = None

mb_rows = []
lb_rows = []

mb_done = False
lb_done = True if lb_query is None else False

extra = 0
missing = 0
processed = 0

while True:
if len(mb_rows) == 0 and not mb_done:
mb_curs.execute(mb_query, (mb_caa_index, SYNC_BATCH_SIZE))
mb_rows = mb_curs.fetchall()
if len(mb_rows) == 0:
mb_done = True

if len(lb_rows) == 0 and not lb_done:
lb_curs.execute(lb_query, (lb_caa_index, SYNC_BATCH_SIZE))
lb_rows = lb_curs.fetchall()
if len(lb_rows) == 0:
lb_done = True

if not mb_row and len(mb_rows) > 0:
mb_row = mb_rows.pop(0)

if not lb_row and len(lb_rows) > 0:
lb_row = lb_rows.pop(0)

if not lb_row and not mb_row:
break

processed += 1
if processed % 100000 == 0:
log("processed %d of %d: missing %d extra %d" % (processed, mb_count, missing, extra))

# If the item is in MB, but not in LB, add to LB
if lb_row is None or mb_row[mb_compare_key] < lb_row[lb_compare_key]:
process_cover_art(threads, mb_row)
missing += 1
mb_caa_index = mb_row[mb_compare_key]
mb_row = None
continue

# If the item is in LB, but not in MB, remove from LB
if mb_row is None or mb_row[mb_compare_key] > lb_row[lb_compare_key]:
extra += 1
delete_from_lb(lb_conn, lb_row[lb_compare_key])
lb_caa_index = lb_row[lb_compare_key]
lb_row = None
continue

# If the caa_id is present in both, skip both
if mb_row[mb_compare_key] == lb_row[lb_compare_key]:
mb_caa_index = mb_row[mb_compare_key]
lb_caa_index = lb_row[lb_compare_key]
lb_row = None
mb_row = None
continue

assert False

join_threads(threads)
log("Finished! added/skipped %d removed %d from release_color" % (missing, extra))

mb_count, lb_count = get_cover_art_counts(mb_curs, lb_curs)
log("CAA count: %d" % (mb_count,))
log("LB count: %d" % (lb_count,))

metrics.init("listenbrainz")
metrics.set(
"listenbrainz-caa-mapper",
caa_front_count=mb_count,
lb_caa_count=lb_count
)

0 comments on commit 0b0e0fd

Please sign in to comment.