Source code for arkimet.scan.grib

from typing import Callable
from collections import defaultdict
import arkimet
import _arkimet
import logging

Grib = _arkimet.scan.grib.Grib

log = logging.getLogger("arkimet.scan.grib")


[docs]class Scanner: by_edition = defaultdict(list) def __init__(self): for type, scanners in self.by_edition.items(): scanners.sort(key=lambda p: p[0])
[docs] def scan(self, grib: Grib, md: arkimet.Metadata): # Find the formatter list for this style scanners = self.by_edition.get(grib.edition) if scanners is None: return None # Try all scanner functions in the list, in priority order, stopping at # the first one that returns False. # Note that False is explicitly required: returning None will not stop # the scanner chain. for prio, scan in scanners: try: if scan(grib, md) is False: break except Exception: log.warning("scanner function failed", exc_info=True)
[docs] @classmethod def register(cls, edition: int, scanner: Callable[[Grib, arkimet.Metadata], None], priority=0): if scanner not in cls.by_edition[edition]: cls.by_edition[edition].append((priority, scanner))