Module ocean_science_utilities.filecache.cache_object
Contents: Simple file caching routines that automatically cache remote files locally for use.
Copyright (C) 2022 Sofar Ocean Technologies
Authors: Pieter Bart Smit
Classes:
- FileCache
, main class implementing the Caching structure. Should not
directly be invoked. Instead, fetching/cache creation is controlled by a
set of function defined below
Functions:
Expand source code
"""
Contents: Simple file caching routines that automatically \
cache remote files locally for use.
Copyright (C) 2022
Sofar Ocean Technologies
Authors: Pieter Bart Smit
======================
Classes:
- `FileCache`, main class implementing the Caching structure. Should not
directly be invoked. Instead, fetching/cache creation is controlled by a
set of function defined below
Functions:
"""
import hashlib
import json
import os
from dataclasses import dataclass
from pathlib import Path
from multiprocessing.pool import ThreadPool
from typing import Callable, Dict, List, Optional, Tuple, Union
from tqdm import tqdm
from warnings import warn
from ocean_science_utilities.tools.log import logger
from ocean_science_utilities.filecache.remote_resources import (
_RemoteResourceUriNotFound,
RemoteResourceHTTPS,
RemoteResource,
RemoteResourceLocal,
)
TEMPORARY_DIRECTORY = "~/temporary_roguewave_files/filecache/"
CACHE_SIZE_GB = 5
MAXIMUM_NUMBER_OF_WORKERS = 10
KILOBYTE = 1000
MEGABYTE = 1000 * KILOBYTE
GIGABYTE = 1000 * MEGABYTE
def do_nothing(*arg, **kwargs) -> Optional[bool]:
"""
Null function for convenience.
:param arg:
:param kwargs:
:return:
"""
return None
@dataclass()
class CacheMiss:
"""
Data class for Cache miss.
"""
uri: str
filepath: str
filename: str
allow_for_missing_files: bool
post_process_function: Callable[[str], Optional[bool]]
download_function: Callable[[str, str], Optional[bool]] = do_nothing
class FileCacheConfig:
def __init__(
self,
size_gb: Union[float, int] = CACHE_SIZE_GB,
parallel: bool = True,
allow_for_missing_files: bool = True,
path: str = TEMPORARY_DIRECTORY,
):
self.path = path
self.size_gb = size_gb
self.parallel = parallel
self.allow_for_missing_files = allow_for_missing_files
if self.config_exists():
self.load_config()
else:
self._write_config()
@property
def name(self) -> str:
return os.path.join(self.path, "file_cache_config.json")
def config_exists(self) -> bool:
return os.path.exists(self.name)
def load_config(self) -> None:
with open(self.name, "rb") as fp:
config = json.load(fp)
self.size_gb = config["size_gb"]
self.parallel = config["parallel"]
self.allow_for_missing_files = config["allow_for_missing_files"]
def _update_config(self, key, value, write=True):
self[key] = value
if write:
self._write_config()
def _write_config(self):
with open(os.path.join(self.path, "file_cache_config.json"), "wt") as fp:
fp.write(
json.dumps(
{
"size_gb": self.size_gb,
"parallel": self.parallel,
"allow_for_missing_files": self.allow_for_missing_files,
},
indent=4,
)
)
@property
def max_size(self) -> Union[float, int]:
return self.size_gb
@max_size.setter
def max_size(self, size_gb: float):
self._update_config("size_gb", size_gb)
@property
def max_size_bytes(self) -> int:
return int(self.size_gb * GIGABYTE)
@max_size_bytes.setter
def max_size_bytes(self, size_bytes: int):
self._update_config("size_gb", size_bytes / GIGABYTE)
@property
def parallel(self) -> bool:
return self.parallel
@parallel.setter
def parallel(self, parallel: bool):
self._update_config("parallel", parallel)
@property
def allow_for_missing_files(self) -> bool:
return self.allow_for_missing_files
@allow_for_missing_files.setter
def allow_for_missing_files(self, allow_for_missing_files: bool):
self._update_config("allow_for_missing_files", allow_for_missing_files)
class FileCache:
"""
Simple file caching class that when given an URI locally stores the
file in the cache directory and returns the path to the file. The file
remains in storage until the cache directory exceeds a prescribed size,
in which case files with oldest access/modified dates get deleted first
until everything fits in the cache again. Any time a file is accessed it's
modified date gets updated so that often used files automaticall remain in
cache.
The files are stored locally in the directory specified on class
initialization, as:
[path]/CACHE_PREFIX + md5_hash_of_URI + CACHE_POSTFIX
The pre- and post fix are added so we have an easy pattern to distinguish
cache files from other files.
Methods
* __getitem__(keys) : accept a simgle uri_key or a list of URI's and
returns filepaths to local copies thereof. You would typically use the
cache[keys] notation instead of the dunder method.
* purge() clear all contents of the cache (destructive, deletes all local
files).
Usage:
cache = FileCache()
list_of_local_file_names = cache[ [list of URI's ] ]
# do stuff with file
...
"""
CACHE_FILE_PREFIX = "cachefile_"
CACHE_FILE_POSTFIX = "_cachefile"
def __init__(
self,
path: str = TEMPORARY_DIRECTORY,
size_GB: Union[float, int] = CACHE_SIZE_GB,
do_cache_eviction_on_startup: bool = False,
resources: Optional[List[RemoteResource]] = None,
parallel: bool = True,
allow_for_missing_files: bool = True,
):
"""
Initialize Cache
:param path: path to store cache. If path does not exist it will be
created.
:param size_GB: Maximum size of the cache in GiB. If cache exceeds
the size, then files with oldest access/modified dates get deleted
until everthing fits in the cache again. Fractional values (floats)
are allowed.
:param do_cache_eviction_on_startup: whether we ensure the cache size
conforms to the given size on startup. If set to true, a cache
directory that exceeds the maximum size will be reduced to max
size. Set to False by default in which case an error occurs. The
latter to prevent eroneously evicting files from a cache that was
previously created on purpose with a larger size that the limit.
"""
self.path = os.path.expanduser(path)
# create the path if it does not exist
if not os.path.exists(path):
os.makedirs(path, exist_ok=True)
self.config = FileCacheConfig(size_GB, parallel, allow_for_missing_files, path)
# Some counters to keep track of total cache misses, hits and
# evictions. No downstream use right now/
self._cache_misses = 0
self._cache_hits = 0
self._cache_evictions = 0
self.disable_progress_bar = False
# initialize the cache.
self._entries: Dict[str, str] = {} # the key/value pair cache
self._initialize_cache(do_cache_eviction_on_startup)
# Post processing and validation functions
self.directives: Dict[str, Dict[str, Callable[[str], Optional[bool]]]] = {
"validate": {},
"postprocess": {},
}
# message to display on progress bar
self.description = "Caching"
# download resources
if resources is None:
self.resources = [
RemoteResourceHTTPS(),
RemoteResourceLocal(),
]
else:
self.resources = resources
def set_directive_function(
self,
directive,
name,
function: Callable[[str], Optional[bool]],
):
"""
AI is creating summary for set_directive_function
Args:
directive ([type]): [description]
name ([type]): [description]
function (Callable[[str], Optional[bool]]): [description]
Raises:
KeyError: [description]
ValueError: [description]
"""
if directive not in self.directives:
raise KeyError(f"{directive} is not a valid cache directive.")
if name in self.directives[directive]:
raise ValueError(f"Function for {name} already exists")
else:
self.directives[directive][name] = function
def remove_directive_function(self, directive: str, name: str):
if directive not in self.directives:
raise KeyError(f"{directive} is not a valid cache directive.")
if name not in self.directives[directive]:
raise ValueError(f"Function for {name} does not exist")
else:
self.directives[directive].pop(name)
def _cache_file_name(self, uri: str) -> str:
"""
Return the filename that corresponds to the given uri. We construct
the file name using a simple md5 hash of the uri string prefixed
with a cache file prefix. THe later is introduced to seperate cache
files in a path from user files (and avoid including/deleting those).
:param uri: valid uri stripped from directives
:return: valid cache file
"""
return self.CACHE_FILE_PREFIX + _hashname(uri) + self.CACHE_FILE_POSTFIX
def _cache_file_path(self, uri: str) -> str:
"""
Construct the path where the given uri is stored locally.
:param uri: valid uri stripped from directives.
:return: valid cache file
"""
return os.path.join(self.path, self._cache_file_name(uri))
def _get_cache_files(self) -> List[str]:
"""
Find all files that are currently a member of the cache.
:return:
"""
for path, dirs, files in os.walk(self.path):
# Return all files that are "cache" objects. This is a safety if
# other user files are present, so that these don't accidentally
# evicted from the cache (aka deleted).
return [
file
for file in files
if file.startswith(self.CACHE_FILE_PREFIX)
and file.endswith(self.CACHE_FILE_POSTFIX)
]
else:
return []
def _initialize_cache(self, do_cache_eviction_on_startup: bool) -> None:
"""
Initialize the file cache. Look on disk for files in the cache path
that have the required prefix and load these into the cache. Once
loaded, we do a check whether or not the cache is full and if we need
to remove files.
:param do_cache_eviction_on_startup: see description under __init__
:return:
"""
self._entries = {}
for file in self._get_cache_files():
filepath = os.path.join(self.path, file)
self._entries[file] = filepath
# See if cache is still < size
if do_cache_eviction_on_startup:
self._cache_eviction()
else:
if self._size() > self.config.max_size_bytes:
raise ValueError(
"The cache currently existing on disk "
"exceeds the maximum cache size of the "
f"current cache ({self.config.max_size} gb)."
f"\n The cache size can be increased by"
f" editting the cache config file: "
f"{self.config.name}"
)
def in_cache(self, unparsed_uris) -> List[bool]:
# make sure input is a list
if isinstance(unparsed_uris, str):
unparsed_uris = [unparsed_uris]
uris, _ = parse_directives(unparsed_uris)
# Create the hashes from the URI's
hashes = [self._cache_file_name(uri) for uri in uris]
return [self._is_in_cache(_hash) for _hash in hashes]
def _is_in_cache(self, _hash: str) -> bool:
"""
Check if a _hash is in the cache
:param _hash: hash to check
:return: True if in Cache, False if not.
"""
cache_hit = _hash in self._entries
return cache_hit
def _add_to_cache(self, _hash: str, filepath: str) -> None:
# add entry to the cache.
self._entries[_hash] = filepath
def remove(self, unparsed_uri: str) -> None:
"""
Remove an entry from the cache
:param unparsed_uri: uri
:return: None
"""
uri, _ = parse_directive(unparsed_uri)
if not self.in_cache(uri):
raise ValueError(f"Key {uri} not in Cache")
_hash = self._cache_file_name(uri)
return self._remove_item_from_cache(_hash)
def _remove_item_from_cache(self, _hash: str) -> None:
"""
Remove a hash key from the cache. Here it is assumed that the _hash is
a valid entry. We do allow for non existance of corresponding files as
the cache can get out of sync if something external deleted the file.
Since the endstate is valid (no entry in cache, no entry on disk) this
is considered OK.
:param _hash: hash key
:return: None
"""
if _hash not in self._entries:
return None
file_to_delete = self._entries.pop(_hash)
if os.path.exists(file_to_delete):
logger.debug(f" - removing {_hash}")
# And delete file.
os.remove(file_to_delete)
else:
logger.debug(f" - file {_hash} did not exist on disk")
return None
def _get_from_cache(self, _hash: str) -> str:
"""
Get entry from cache and touch the file to indicate it has been used
recently (last to be evicted)
:param _hash: file_hash corresponding to uri
:return: file path
"""
filepath = self._entries[_hash]
if not os.path.exists(filepath):
raise FileNotFoundError(
"The filepath in the cache log does not" "exist on disk."
)
# Touch the file to indicate we recently used it.
Path(filepath).touch()
return filepath
def get_cache_misses(
self, uris: List[str], directives: List[Dict[str, str]]
) -> List[CacheMiss]:
"""
Function to get all cache misses and return a list of CacheMiss objects
needed to download the misses from remote resources.
This function also perform validates on potential cache hits if a
relevant validation function is set *and* validation is requested
through a directive.
:param uris: list of uris stripped of directives
:param directives: list of directives per uri (empty dict if none)
:return: list of cache misses
"""
cache_misses = []
for uri, directive in zip(uris, directives):
# what is the hashkey/filename
hashkey = self._cache_file_name(uri)
filepath = self._cache_file_path(uri)
# is the key in cache?
valid_entry: Optional[bool] = False
if self._is_in_cache(hashkey):
# If so is it a valid entry
if "validate" in directive:
# Call the user supplied validation function with the
# filepath as argument
validation_function = self.directives["validate"][
directive["validate"]
]
try:
valid_entry = validation_function(filepath)
except IOError:
valid_entry = False
if not valid_entry:
# remove the locally stored entry if not valid
os.remove(filepath)
else:
valid_entry = True
else:
# Defaults to True if no validation directive is given
valid_entry = True
if not valid_entry:
# If not a valid entry (either missing or invalid)
#
if "postprocess" in directive:
# Add the postprocess function to use if requested.
post_process_function = self.directives["postprocess"][
directive["postprocess"]
]
else:
# otherwise set a null function as postprocessor
post_process_function = do_nothing
# Remove any comments to the URI used to make it unique
uri_to_download = uri.split("<<")[0]
cache_misses.append(
CacheMiss(
uri=uri_to_download,
filepath=filepath,
filename=hashkey,
allow_for_missing_files=self.config.allow_for_missing_files,
post_process_function=post_process_function,
)
)
return cache_misses
def __len__(self) -> int:
"""
:return: Number of entries in the cache.
"""
return len(self._entries)
def __getitem__(self, unparsed_uris: Union[List, str]) -> List[str]:
"""
Get filenames corresponding to locally stored versions of the objects
the URI points to. Note that the unparsed_uris take the form:
[ directive=option ; ... directive=option ] ":"
[scheme] "://" [path] ">>" [comment]
e.g for amazon s3 where we want to perform validation and post
processing on entries:
validate=grib;postprocess=grib:s3://bucket/key
or without cache directives
s3://bucket/key
Cache directives are optional, but if specified the corresponding
user defined handling function must have been set. By default no
validation or postprocessing functions are set.
In addition, we can add a "comment" to the uri by appending
">>[comment]", e.g.
s3://bucket/key>>THISISTHECOMMENT
The comment (including ">>") is stripped prior to downloading from
the remote resource. However, the comment part *is* included in
generation of the hash. This allows us to cache the same resource with
different names. This is primarily useful if we potentially apply
different postprocessing functions to the same remote resource.
:param unparsed_uris: URI's that may still include directives.
:return:
"""
# make sure input is a list
if isinstance(unparsed_uris, str):
unparsed_uris = [unparsed_uris]
# Remove cache directives from uris (if included)
uris, directives = parse_directives(unparsed_uris)
filepaths = [self._cache_file_path(uri) for uri in uris]
# for all URI's not in cache
if cache_misses := self.get_cache_misses(uris, directives):
was_succesfully_downloaded = _download_from_resources(
cache_misses,
self.resources,
parallel_download=self.config.parallel,
disable_progress_bar=self.disable_progress_bar,
desc=self.description,
)
for cache_miss, success in zip(cache_misses, was_succesfully_downloaded):
if success:
self._add_to_cache(cache_miss.filename, cache_miss.filepath)
else:
index = filepaths.index(cache_miss.filepath)
filepaths.pop(index)
size_of_requested_data = _get_total_size_of_files_in_bytes(filepaths)
if size_of_requested_data > self.config.max_size_bytes:
warning = (
f"The requested data does not fit into the cache."
f"To avoid issues the cache is enlarged to ensure"
f"the current set of files fits in the cache. \n"
f"old size: {self.config.max_size_bytes} bytes; "
f"new size {size_of_requested_data + MEGABYTE}"
)
warn(warning)
logger.warning(warning)
self.config.max_size_bytes = size_of_requested_data + MEGABYTE
self._cache_misses += len(cache_misses)
self._cache_hits += len(uris) - len(cache_misses)
# See if we need to do any cache eviction because the cache has become
# to big.
if not len(cache_misses) == 0:
self._cache_eviction()
return filepaths
def _cache_eviction(self) -> bool:
"""
Simple cache eviction policy. If the cache exceeds the maximum size
remove data from the cache based on whichever file was interacted with
the longest time ago. Evict files until we are below the acceptable
cache size.
:return: True if eviction occured, False otherwise.
"""
# check if we exceed the size, if not return
if not self._size() > self.config.max_size_bytes:
return False
# Get access/modified times for all the files in cache
modified = []
for _hash, fp in self._entries.items():
# From my brief reading, access time is not always reliable,
# hence I use whatever the latest time set is for modified or
# access time as an indicator of when we last interacted with
# the file.
access_time = os.path.getatime(fp)
modified_time = os.path.getmtime(fp)
# pick whichever is most recent.
time_to_check = (
access_time if access_time > modified_time else modified_time
)
modified.append((time_to_check, _hash))
# Sort files in reversed chronological order.
files_in_cache = [
x[1] for x in sorted(modified, key=lambda x: x[0], reverse=True)
]
# Delete files one by one as long as the cache_size exceeds the max
# size.
while (_size := self._size()) > self.config.max_size_bytes:
self._cache_evictions += 1
logger.debug(
f"Cache exceeds limits: {_size} bytes, max size: "
f"{self.config.max_size_bytes} bytes"
)
# Get the hash and path of the oldest file and remove
self._remove_item_from_cache(files_in_cache.pop())
return True
def _size(self) -> int:
"""
Return size on disk of the cache in bytes.
:return: cache size in bytes.
"""
return _get_total_size_of_files_in_bytes(
list(self._entries.values()), self.path
)
def purge(self) -> None:
"""
Delete all the files in the cache.
:return: None
"""
logger.debug("Purging cache")
keys = list(self._entries.keys())
for key in keys:
filepath = self._entries.pop(key)
logger.debug(" - deleting {filepath}")
os.remove(filepath)
logger.debug("Purging cache done")
def _download_from_resources(
cache_misses: List[CacheMiss],
resources: List[RemoteResource],
parallel_download=False,
disable_progress_bar=False,
desc="",
) -> List[bool]:
"""
Wrapper function to download multiple uris from the resource(s).
:param cache_misses: List containing cache misses to download
:param parallel_download: If true, downloading is performed in parallel.
:return: List of boolean indicating if the download was a success.
"""
def _worker(cache_miss: CacheMiss) -> bool:
try:
cache_miss.download_function(cache_miss.uri, cache_miss.filepath)
cache_miss.post_process_function(cache_miss.filepath)
return True
except _RemoteResourceUriNotFound as e:
if cache_miss.allow_for_missing_files:
warning = f"Uri not retrieved: {str(e)}"
warn(warning)
logger.warning(warning)
else:
raise e
return False
# construct the arguments to be used for parallel downloading of files.
# Specifically, we need to match the right resource for downloading to the
# right URI.
for cache_miss in cache_misses:
# Loop over all resources until we find one that can interpret the URI
# (this is pretty naive approach and should probably be refactored to
# some direct mapping if the number of resources ever gets very long)
for resource in resources:
# For each resource check if the resource can interpret the URI
if resource.valid_uri(cache_miss.uri):
# If so, get the download function, and other arguments and
# break
cache_miss.download_function = resource.download()
break
else:
# If we didn't break the loop no valid resource was found, raise
# error
raise ValueError(f"No resource available for URI: " f"{cache_miss.uri}")
# Download the requested objects.
if parallel_download and len(cache_misses) > 1:
with ThreadPool(processes=MAXIMUM_NUMBER_OF_WORKERS) as pool:
output = list(
tqdm(
pool.imap(_worker, cache_misses, chunksize=5),
desc=desc,
total=len(cache_misses),
)
)
else:
if len(cache_misses) == 1:
disable_progress_bar = True
output = list(
tqdm(
map(_worker, cache_misses),
total=len(cache_misses),
disable=disable_progress_bar,
desc=desc,
)
)
return output
def _get_total_size_of_files_in_bytes(filenames: List[str], path=None) -> int:
"""
Simple function to calculate the size of a list of files on disk.
:param filenames: list of filenames or filepaths
:param path: if filenames are provided, this lists the path, otherwise set
to None
:return: Total size in bytes
"""
size = 0
for filename in filenames:
if path is None:
filepath = filename
else:
filepath = os.path.join(path, filename)
if os.path.exists(filepath):
size += os.path.getsize(filepath)
return size
def _hashname(string: str) -> str:
"""
Returns a md5 hash of a given string.
:param string: input string
:return: hexdigest of md5 hash.
"""
return hashlib.md5(string.encode(), usedforsecurity=False).hexdigest()
def parse_directives(raw_uris: List[str]) -> Tuple[List[str], List[dict]]:
uris = []
directives = []
for raw_uri in raw_uris:
uri, directive = parse_directive(raw_uri)
uris.append(uri)
directives.append(directive)
return uris, directives
def parse_directive(unparsed_uri: str) -> Tuple[str, dict]:
"""
unparsed_uris take the form:
[ directive=option ; ... directive=option ] ":" [scheme] "://" [path]
e.g for amazon s3 where we want to perform validation and post
processing on entries:
validate=grib;postprocess=grib:s3://bucket/key
or without cache directives
s3://bucket/key
This function seperates the directive/option pairs into a directove
dictionary, and a valid uri, i.e.
validate=grib;postprocess=grib:s3://bucket/key
becomes
directive = { "validate":"grib", "postprocess":"grib}
uri = s3://bucket/key
The parsing is really simple.
:param unparsed_uri: uri possibly containing cache directives
:return:
"""
# split in directives_scheme part and a path.
directives_and_scheme, path = unparsed_uri.split("://")
parsed_directives = {}
# if a colon is present then directives are provided.
if ":" in directives_and_scheme:
# split directives from the scheme
directive_str, scheme = directives_and_scheme.split(":")
# split multiple directives (if present)
directives = directive_str.split(";")
# for each directive store in the dict.
for directive in directives:
directive_name, directive_parameter = directive.split("=")
parsed_directives[directive_name] = directive_parameter
else:
# no directives
scheme = directives_and_scheme
uri = scheme + "://" + path
return uri, parsed_directives
Functions
def do_nothing(*arg, **kwargs) ‑> Optional[bool]
-
Null function for convenience.
:param arg: :param kwargs: :return:
Expand source code
def do_nothing(*arg, **kwargs) -> Optional[bool]: """ Null function for convenience. :param arg: :param kwargs: :return: """ return None
def parse_directive(unparsed_uri: str) ‑> Tuple[str, dict]
-
unparsed_uris take the form:
[ directive=option ; ... directive=option ] ":" [scheme] "://" [path] e.g for amazon s3 where we want to perform validation and post processing on entries: validate=grib;postprocess=grib:s3://bucket/key or without cache directives s3://bucket/key
This function seperates the directive/option pairs into a directove dictionary, and a valid uri, i.e.
validate=grib;postprocess=grib:s3://bucket/key
becomes
directive = { "validate":"grib", "postprocess":"grib} uri = s3://bucket/key
The parsing is really simple.
:param unparsed_uri: uri possibly containing cache directives :return:
Expand source code
def parse_directive(unparsed_uri: str) -> Tuple[str, dict]: """ unparsed_uris take the form: [ directive=option ; ... directive=option ] ":" [scheme] "://" [path] e.g for amazon s3 where we want to perform validation and post processing on entries: validate=grib;postprocess=grib:s3://bucket/key or without cache directives s3://bucket/key This function seperates the directive/option pairs into a directove dictionary, and a valid uri, i.e. validate=grib;postprocess=grib:s3://bucket/key becomes directive = { "validate":"grib", "postprocess":"grib} uri = s3://bucket/key The parsing is really simple. :param unparsed_uri: uri possibly containing cache directives :return: """ # split in directives_scheme part and a path. directives_and_scheme, path = unparsed_uri.split("://") parsed_directives = {} # if a colon is present then directives are provided. if ":" in directives_and_scheme: # split directives from the scheme directive_str, scheme = directives_and_scheme.split(":") # split multiple directives (if present) directives = directive_str.split(";") # for each directive store in the dict. for directive in directives: directive_name, directive_parameter = directive.split("=") parsed_directives[directive_name] = directive_parameter else: # no directives scheme = directives_and_scheme uri = scheme + "://" + path return uri, parsed_directives
def parse_directives(raw_uris: List[str]) ‑> Tuple[List[str], List[dict]]
-
Expand source code
def parse_directives(raw_uris: List[str]) -> Tuple[List[str], List[dict]]: uris = [] directives = [] for raw_uri in raw_uris: uri, directive = parse_directive(raw_uri) uris.append(uri) directives.append(directive) return uris, directives
Classes
class CacheMiss (uri: str, filepath: str, filename: str, allow_for_missing_files: bool, post_process_function: Callable[[str], Optional[bool]], download_function: Callable[[str, str], Optional[bool]] = <function do_nothing>)
-
Data class for Cache miss.
Expand source code
@dataclass() class CacheMiss: """ Data class for Cache miss. """ uri: str filepath: str filename: str allow_for_missing_files: bool post_process_function: Callable[[str], Optional[bool]] download_function: Callable[[str, str], Optional[bool]] = do_nothing
Class variables
var allow_for_missing_files : bool
var filename : str
var filepath : str
var post_process_function : Callable[[str], Optional[bool]]
var uri : str
Methods
def download_function(*arg, **kwargs) ‑> Optional[bool]
-
Null function for convenience.
:param arg: :param kwargs: :return:
Expand source code
def do_nothing(*arg, **kwargs) -> Optional[bool]: """ Null function for convenience. :param arg: :param kwargs: :return: """ return None
class FileCache (path: str = '~/temporary_roguewave_files/filecache/', size_GB: Union[float, int] = 5, do_cache_eviction_on_startup: bool = False, resources: Optional[List[RemoteResource]] = None, parallel: bool = True, allow_for_missing_files: bool = True)
-
Simple file caching class that when given an URI locally stores the file in the cache directory and returns the path to the file. The file remains in storage until the cache directory exceeds a prescribed size, in which case files with oldest access/modified dates get deleted first until everything fits in the cache again. Any time a file is accessed it's modified date gets updated so that often used files automaticall remain in cache.
The files are stored locally in the directory specified on class initialization, as:
[path]/CACHE_PREFIX + md5_hash_of_URI + CACHE_POSTFIX
The pre- and post fix are added so we have an easy pattern to distinguish cache files from other files.
Methods * getitem(keys) : accept a simgle uri_key or a list of URI's and returns filepaths to local copies thereof. You would typically use the cache[keys] notation instead of the dunder method. * purge() clear all contents of the cache (destructive, deletes all local files).
Usage
cache = FileCache() list_of_local_file_names = cache[ [list of URI's ] ]
do stuff with file
…
Initialize Cache :param path: path to store cache. If path does not exist it will be created. :param size_GB: Maximum size of the cache in GiB. If cache exceeds the size, then files with oldest access/modified dates get deleted until everthing fits in the cache again. Fractional values (floats) are allowed. :param do_cache_eviction_on_startup: whether we ensure the cache size conforms to the given size on startup. If set to true, a cache directory that exceeds the maximum size will be reduced to max size. Set to False by default in which case an error occurs. The latter to prevent eroneously evicting files from a cache that was previously created on purpose with a larger size that the limit.
Expand source code
class FileCache: """ Simple file caching class that when given an URI locally stores the file in the cache directory and returns the path to the file. The file remains in storage until the cache directory exceeds a prescribed size, in which case files with oldest access/modified dates get deleted first until everything fits in the cache again. Any time a file is accessed it's modified date gets updated so that often used files automaticall remain in cache. The files are stored locally in the directory specified on class initialization, as: [path]/CACHE_PREFIX + md5_hash_of_URI + CACHE_POSTFIX The pre- and post fix are added so we have an easy pattern to distinguish cache files from other files. Methods * __getitem__(keys) : accept a simgle uri_key or a list of URI's and returns filepaths to local copies thereof. You would typically use the cache[keys] notation instead of the dunder method. * purge() clear all contents of the cache (destructive, deletes all local files). Usage: cache = FileCache() list_of_local_file_names = cache[ [list of URI's ] ] # do stuff with file ... """ CACHE_FILE_PREFIX = "cachefile_" CACHE_FILE_POSTFIX = "_cachefile" def __init__( self, path: str = TEMPORARY_DIRECTORY, size_GB: Union[float, int] = CACHE_SIZE_GB, do_cache_eviction_on_startup: bool = False, resources: Optional[List[RemoteResource]] = None, parallel: bool = True, allow_for_missing_files: bool = True, ): """ Initialize Cache :param path: path to store cache. If path does not exist it will be created. :param size_GB: Maximum size of the cache in GiB. If cache exceeds the size, then files with oldest access/modified dates get deleted until everthing fits in the cache again. Fractional values (floats) are allowed. :param do_cache_eviction_on_startup: whether we ensure the cache size conforms to the given size on startup. If set to true, a cache directory that exceeds the maximum size will be reduced to max size. Set to False by default in which case an error occurs. The latter to prevent eroneously evicting files from a cache that was previously created on purpose with a larger size that the limit. """ self.path = os.path.expanduser(path) # create the path if it does not exist if not os.path.exists(path): os.makedirs(path, exist_ok=True) self.config = FileCacheConfig(size_GB, parallel, allow_for_missing_files, path) # Some counters to keep track of total cache misses, hits and # evictions. No downstream use right now/ self._cache_misses = 0 self._cache_hits = 0 self._cache_evictions = 0 self.disable_progress_bar = False # initialize the cache. self._entries: Dict[str, str] = {} # the key/value pair cache self._initialize_cache(do_cache_eviction_on_startup) # Post processing and validation functions self.directives: Dict[str, Dict[str, Callable[[str], Optional[bool]]]] = { "validate": {}, "postprocess": {}, } # message to display on progress bar self.description = "Caching" # download resources if resources is None: self.resources = [ RemoteResourceHTTPS(), RemoteResourceLocal(), ] else: self.resources = resources def set_directive_function( self, directive, name, function: Callable[[str], Optional[bool]], ): """ AI is creating summary for set_directive_function Args: directive ([type]): [description] name ([type]): [description] function (Callable[[str], Optional[bool]]): [description] Raises: KeyError: [description] ValueError: [description] """ if directive not in self.directives: raise KeyError(f"{directive} is not a valid cache directive.") if name in self.directives[directive]: raise ValueError(f"Function for {name} already exists") else: self.directives[directive][name] = function def remove_directive_function(self, directive: str, name: str): if directive not in self.directives: raise KeyError(f"{directive} is not a valid cache directive.") if name not in self.directives[directive]: raise ValueError(f"Function for {name} does not exist") else: self.directives[directive].pop(name) def _cache_file_name(self, uri: str) -> str: """ Return the filename that corresponds to the given uri. We construct the file name using a simple md5 hash of the uri string prefixed with a cache file prefix. THe later is introduced to seperate cache files in a path from user files (and avoid including/deleting those). :param uri: valid uri stripped from directives :return: valid cache file """ return self.CACHE_FILE_PREFIX + _hashname(uri) + self.CACHE_FILE_POSTFIX def _cache_file_path(self, uri: str) -> str: """ Construct the path where the given uri is stored locally. :param uri: valid uri stripped from directives. :return: valid cache file """ return os.path.join(self.path, self._cache_file_name(uri)) def _get_cache_files(self) -> List[str]: """ Find all files that are currently a member of the cache. :return: """ for path, dirs, files in os.walk(self.path): # Return all files that are "cache" objects. This is a safety if # other user files are present, so that these don't accidentally # evicted from the cache (aka deleted). return [ file for file in files if file.startswith(self.CACHE_FILE_PREFIX) and file.endswith(self.CACHE_FILE_POSTFIX) ] else: return [] def _initialize_cache(self, do_cache_eviction_on_startup: bool) -> None: """ Initialize the file cache. Look on disk for files in the cache path that have the required prefix and load these into the cache. Once loaded, we do a check whether or not the cache is full and if we need to remove files. :param do_cache_eviction_on_startup: see description under __init__ :return: """ self._entries = {} for file in self._get_cache_files(): filepath = os.path.join(self.path, file) self._entries[file] = filepath # See if cache is still < size if do_cache_eviction_on_startup: self._cache_eviction() else: if self._size() > self.config.max_size_bytes: raise ValueError( "The cache currently existing on disk " "exceeds the maximum cache size of the " f"current cache ({self.config.max_size} gb)." f"\n The cache size can be increased by" f" editting the cache config file: " f"{self.config.name}" ) def in_cache(self, unparsed_uris) -> List[bool]: # make sure input is a list if isinstance(unparsed_uris, str): unparsed_uris = [unparsed_uris] uris, _ = parse_directives(unparsed_uris) # Create the hashes from the URI's hashes = [self._cache_file_name(uri) for uri in uris] return [self._is_in_cache(_hash) for _hash in hashes] def _is_in_cache(self, _hash: str) -> bool: """ Check if a _hash is in the cache :param _hash: hash to check :return: True if in Cache, False if not. """ cache_hit = _hash in self._entries return cache_hit def _add_to_cache(self, _hash: str, filepath: str) -> None: # add entry to the cache. self._entries[_hash] = filepath def remove(self, unparsed_uri: str) -> None: """ Remove an entry from the cache :param unparsed_uri: uri :return: None """ uri, _ = parse_directive(unparsed_uri) if not self.in_cache(uri): raise ValueError(f"Key {uri} not in Cache") _hash = self._cache_file_name(uri) return self._remove_item_from_cache(_hash) def _remove_item_from_cache(self, _hash: str) -> None: """ Remove a hash key from the cache. Here it is assumed that the _hash is a valid entry. We do allow for non existance of corresponding files as the cache can get out of sync if something external deleted the file. Since the endstate is valid (no entry in cache, no entry on disk) this is considered OK. :param _hash: hash key :return: None """ if _hash not in self._entries: return None file_to_delete = self._entries.pop(_hash) if os.path.exists(file_to_delete): logger.debug(f" - removing {_hash}") # And delete file. os.remove(file_to_delete) else: logger.debug(f" - file {_hash} did not exist on disk") return None def _get_from_cache(self, _hash: str) -> str: """ Get entry from cache and touch the file to indicate it has been used recently (last to be evicted) :param _hash: file_hash corresponding to uri :return: file path """ filepath = self._entries[_hash] if not os.path.exists(filepath): raise FileNotFoundError( "The filepath in the cache log does not" "exist on disk." ) # Touch the file to indicate we recently used it. Path(filepath).touch() return filepath def get_cache_misses( self, uris: List[str], directives: List[Dict[str, str]] ) -> List[CacheMiss]: """ Function to get all cache misses and return a list of CacheMiss objects needed to download the misses from remote resources. This function also perform validates on potential cache hits if a relevant validation function is set *and* validation is requested through a directive. :param uris: list of uris stripped of directives :param directives: list of directives per uri (empty dict if none) :return: list of cache misses """ cache_misses = [] for uri, directive in zip(uris, directives): # what is the hashkey/filename hashkey = self._cache_file_name(uri) filepath = self._cache_file_path(uri) # is the key in cache? valid_entry: Optional[bool] = False if self._is_in_cache(hashkey): # If so is it a valid entry if "validate" in directive: # Call the user supplied validation function with the # filepath as argument validation_function = self.directives["validate"][ directive["validate"] ] try: valid_entry = validation_function(filepath) except IOError: valid_entry = False if not valid_entry: # remove the locally stored entry if not valid os.remove(filepath) else: valid_entry = True else: # Defaults to True if no validation directive is given valid_entry = True if not valid_entry: # If not a valid entry (either missing or invalid) # if "postprocess" in directive: # Add the postprocess function to use if requested. post_process_function = self.directives["postprocess"][ directive["postprocess"] ] else: # otherwise set a null function as postprocessor post_process_function = do_nothing # Remove any comments to the URI used to make it unique uri_to_download = uri.split("<<")[0] cache_misses.append( CacheMiss( uri=uri_to_download, filepath=filepath, filename=hashkey, allow_for_missing_files=self.config.allow_for_missing_files, post_process_function=post_process_function, ) ) return cache_misses def __len__(self) -> int: """ :return: Number of entries in the cache. """ return len(self._entries) def __getitem__(self, unparsed_uris: Union[List, str]) -> List[str]: """ Get filenames corresponding to locally stored versions of the objects the URI points to. Note that the unparsed_uris take the form: [ directive=option ; ... directive=option ] ":" [scheme] "://" [path] ">>" [comment] e.g for amazon s3 where we want to perform validation and post processing on entries: validate=grib;postprocess=grib:s3://bucket/key or without cache directives s3://bucket/key Cache directives are optional, but if specified the corresponding user defined handling function must have been set. By default no validation or postprocessing functions are set. In addition, we can add a "comment" to the uri by appending ">>[comment]", e.g. s3://bucket/key>>THISISTHECOMMENT The comment (including ">>") is stripped prior to downloading from the remote resource. However, the comment part *is* included in generation of the hash. This allows us to cache the same resource with different names. This is primarily useful if we potentially apply different postprocessing functions to the same remote resource. :param unparsed_uris: URI's that may still include directives. :return: """ # make sure input is a list if isinstance(unparsed_uris, str): unparsed_uris = [unparsed_uris] # Remove cache directives from uris (if included) uris, directives = parse_directives(unparsed_uris) filepaths = [self._cache_file_path(uri) for uri in uris] # for all URI's not in cache if cache_misses := self.get_cache_misses(uris, directives): was_succesfully_downloaded = _download_from_resources( cache_misses, self.resources, parallel_download=self.config.parallel, disable_progress_bar=self.disable_progress_bar, desc=self.description, ) for cache_miss, success in zip(cache_misses, was_succesfully_downloaded): if success: self._add_to_cache(cache_miss.filename, cache_miss.filepath) else: index = filepaths.index(cache_miss.filepath) filepaths.pop(index) size_of_requested_data = _get_total_size_of_files_in_bytes(filepaths) if size_of_requested_data > self.config.max_size_bytes: warning = ( f"The requested data does not fit into the cache." f"To avoid issues the cache is enlarged to ensure" f"the current set of files fits in the cache. \n" f"old size: {self.config.max_size_bytes} bytes; " f"new size {size_of_requested_data + MEGABYTE}" ) warn(warning) logger.warning(warning) self.config.max_size_bytes = size_of_requested_data + MEGABYTE self._cache_misses += len(cache_misses) self._cache_hits += len(uris) - len(cache_misses) # See if we need to do any cache eviction because the cache has become # to big. if not len(cache_misses) == 0: self._cache_eviction() return filepaths def _cache_eviction(self) -> bool: """ Simple cache eviction policy. If the cache exceeds the maximum size remove data from the cache based on whichever file was interacted with the longest time ago. Evict files until we are below the acceptable cache size. :return: True if eviction occured, False otherwise. """ # check if we exceed the size, if not return if not self._size() > self.config.max_size_bytes: return False # Get access/modified times for all the files in cache modified = [] for _hash, fp in self._entries.items(): # From my brief reading, access time is not always reliable, # hence I use whatever the latest time set is for modified or # access time as an indicator of when we last interacted with # the file. access_time = os.path.getatime(fp) modified_time = os.path.getmtime(fp) # pick whichever is most recent. time_to_check = ( access_time if access_time > modified_time else modified_time ) modified.append((time_to_check, _hash)) # Sort files in reversed chronological order. files_in_cache = [ x[1] for x in sorted(modified, key=lambda x: x[0], reverse=True) ] # Delete files one by one as long as the cache_size exceeds the max # size. while (_size := self._size()) > self.config.max_size_bytes: self._cache_evictions += 1 logger.debug( f"Cache exceeds limits: {_size} bytes, max size: " f"{self.config.max_size_bytes} bytes" ) # Get the hash and path of the oldest file and remove self._remove_item_from_cache(files_in_cache.pop()) return True def _size(self) -> int: """ Return size on disk of the cache in bytes. :return: cache size in bytes. """ return _get_total_size_of_files_in_bytes( list(self._entries.values()), self.path ) def purge(self) -> None: """ Delete all the files in the cache. :return: None """ logger.debug("Purging cache") keys = list(self._entries.keys()) for key in keys: filepath = self._entries.pop(key) logger.debug(" - deleting {filepath}") os.remove(filepath) logger.debug("Purging cache done")
Class variables
var CACHE_FILE_POSTFIX
var CACHE_FILE_PREFIX
Methods
def get_cache_misses(self, uris: List[str], directives: List[Dict[str, str]]) ‑> List[CacheMiss]
-
Function to get all cache misses and return a list of CacheMiss objects needed to download the misses from remote resources.
This function also perform validates on potential cache hits if a relevant validation function is set and validation is requested through a directive.
:param uris: list of uris stripped of directives :param directives: list of directives per uri (empty dict if none) :return: list of cache misses
Expand source code
def get_cache_misses( self, uris: List[str], directives: List[Dict[str, str]] ) -> List[CacheMiss]: """ Function to get all cache misses and return a list of CacheMiss objects needed to download the misses from remote resources. This function also perform validates on potential cache hits if a relevant validation function is set *and* validation is requested through a directive. :param uris: list of uris stripped of directives :param directives: list of directives per uri (empty dict if none) :return: list of cache misses """ cache_misses = [] for uri, directive in zip(uris, directives): # what is the hashkey/filename hashkey = self._cache_file_name(uri) filepath = self._cache_file_path(uri) # is the key in cache? valid_entry: Optional[bool] = False if self._is_in_cache(hashkey): # If so is it a valid entry if "validate" in directive: # Call the user supplied validation function with the # filepath as argument validation_function = self.directives["validate"][ directive["validate"] ] try: valid_entry = validation_function(filepath) except IOError: valid_entry = False if not valid_entry: # remove the locally stored entry if not valid os.remove(filepath) else: valid_entry = True else: # Defaults to True if no validation directive is given valid_entry = True if not valid_entry: # If not a valid entry (either missing or invalid) # if "postprocess" in directive: # Add the postprocess function to use if requested. post_process_function = self.directives["postprocess"][ directive["postprocess"] ] else: # otherwise set a null function as postprocessor post_process_function = do_nothing # Remove any comments to the URI used to make it unique uri_to_download = uri.split("<<")[0] cache_misses.append( CacheMiss( uri=uri_to_download, filepath=filepath, filename=hashkey, allow_for_missing_files=self.config.allow_for_missing_files, post_process_function=post_process_function, ) ) return cache_misses
def in_cache(self, unparsed_uris) ‑> List[bool]
-
Expand source code
def in_cache(self, unparsed_uris) -> List[bool]: # make sure input is a list if isinstance(unparsed_uris, str): unparsed_uris = [unparsed_uris] uris, _ = parse_directives(unparsed_uris) # Create the hashes from the URI's hashes = [self._cache_file_name(uri) for uri in uris] return [self._is_in_cache(_hash) for _hash in hashes]
def purge(self) ‑> None
-
Delete all the files in the cache. :return: None
Expand source code
def purge(self) -> None: """ Delete all the files in the cache. :return: None """ logger.debug("Purging cache") keys = list(self._entries.keys()) for key in keys: filepath = self._entries.pop(key) logger.debug(" - deleting {filepath}") os.remove(filepath) logger.debug("Purging cache done")
def remove(self, unparsed_uri: str) ‑> None
-
Remove an entry from the cache :param unparsed_uri: uri :return: None
Expand source code
def remove(self, unparsed_uri: str) -> None: """ Remove an entry from the cache :param unparsed_uri: uri :return: None """ uri, _ = parse_directive(unparsed_uri) if not self.in_cache(uri): raise ValueError(f"Key {uri} not in Cache") _hash = self._cache_file_name(uri) return self._remove_item_from_cache(_hash)
def remove_directive_function(self, directive: str, name: str)
-
Expand source code
def remove_directive_function(self, directive: str, name: str): if directive not in self.directives: raise KeyError(f"{directive} is not a valid cache directive.") if name not in self.directives[directive]: raise ValueError(f"Function for {name} does not exist") else: self.directives[directive].pop(name)
def set_directive_function(self, directive, name, function: Callable[[str], Optional[bool]])
-
AI is creating summary for set_directive_function
Args
directive
:[type]
- [description]
name
:[type]
- [description]
function
:Callable[[str], Optional[bool]]
- [description]
Raises
KeyError
- [description]
ValueError
- [description]
Expand source code
def set_directive_function( self, directive, name, function: Callable[[str], Optional[bool]], ): """ AI is creating summary for set_directive_function Args: directive ([type]): [description] name ([type]): [description] function (Callable[[str], Optional[bool]]): [description] Raises: KeyError: [description] ValueError: [description] """ if directive not in self.directives: raise KeyError(f"{directive} is not a valid cache directive.") if name in self.directives[directive]: raise ValueError(f"Function for {name} already exists") else: self.directives[directive][name] = function
class FileCacheConfig (size_gb: Union[float, int] = 5, parallel: bool = True, allow_for_missing_files: bool = True, path: str = '~/temporary_roguewave_files/filecache/')
-
Expand source code
class FileCacheConfig: def __init__( self, size_gb: Union[float, int] = CACHE_SIZE_GB, parallel: bool = True, allow_for_missing_files: bool = True, path: str = TEMPORARY_DIRECTORY, ): self.path = path self.size_gb = size_gb self.parallel = parallel self.allow_for_missing_files = allow_for_missing_files if self.config_exists(): self.load_config() else: self._write_config() @property def name(self) -> str: return os.path.join(self.path, "file_cache_config.json") def config_exists(self) -> bool: return os.path.exists(self.name) def load_config(self) -> None: with open(self.name, "rb") as fp: config = json.load(fp) self.size_gb = config["size_gb"] self.parallel = config["parallel"] self.allow_for_missing_files = config["allow_for_missing_files"] def _update_config(self, key, value, write=True): self[key] = value if write: self._write_config() def _write_config(self): with open(os.path.join(self.path, "file_cache_config.json"), "wt") as fp: fp.write( json.dumps( { "size_gb": self.size_gb, "parallel": self.parallel, "allow_for_missing_files": self.allow_for_missing_files, }, indent=4, ) ) @property def max_size(self) -> Union[float, int]: return self.size_gb @max_size.setter def max_size(self, size_gb: float): self._update_config("size_gb", size_gb) @property def max_size_bytes(self) -> int: return int(self.size_gb * GIGABYTE) @max_size_bytes.setter def max_size_bytes(self, size_bytes: int): self._update_config("size_gb", size_bytes / GIGABYTE) @property def parallel(self) -> bool: return self.parallel @parallel.setter def parallel(self, parallel: bool): self._update_config("parallel", parallel) @property def allow_for_missing_files(self) -> bool: return self.allow_for_missing_files @allow_for_missing_files.setter def allow_for_missing_files(self, allow_for_missing_files: bool): self._update_config("allow_for_missing_files", allow_for_missing_files)
Instance variables
var allow_for_missing_files : bool
-
Expand source code
@property def allow_for_missing_files(self) -> bool: return self.allow_for_missing_files
var max_size : Union[float, int]
-
Expand source code
@property def max_size(self) -> Union[float, int]: return self.size_gb
var max_size_bytes : int
-
Expand source code
@property def max_size_bytes(self) -> int: return int(self.size_gb * GIGABYTE)
var name : str
-
Expand source code
@property def name(self) -> str: return os.path.join(self.path, "file_cache_config.json")
var parallel : bool
-
Expand source code
@property def parallel(self) -> bool: return self.parallel
Methods
def config_exists(self) ‑> bool
-
Expand source code
def config_exists(self) -> bool: return os.path.exists(self.name)
def load_config(self) ‑> None
-
Expand source code
def load_config(self) -> None: with open(self.name, "rb") as fp: config = json.load(fp) self.size_gb = config["size_gb"] self.parallel = config["parallel"] self.allow_for_missing_files = config["allow_for_missing_files"]