Source code for pdsparser.utils

##########################################################################################
# pdsparser/_utils.py
##########################################################################################
"""Utility functions."""

import re
from filecache import FCPath


[docs] def read_label(filepath, *, chars=4000): """Read the PDS3 label from a file. Supports attached labels within binary files. Parameters: filepath (str, pathlib.Path, or filecache.FCPath): The path to the file. If the file does not contain a PDS3 label, a detached label (with the same path but ending in ".lbl" or ".LBL") is read instead. chars (int, optional): Initial number of characters to read from the top of a binary file when extracting the label. Reads will continue until the END statement is found. Returns: str: The content of the label as a single string with newline terminators. Raises: FileNotFoundError: If the label file is missing. SyntaxError: If the END statement is not found in a binary file. Notes: If the `filepath` ends in ".lbl" or ".LBL", it is assumed to refer to a detached label and the entire file content is returned. Otherwise, it reads the file (which may be binary) until it finds an "END" statement. """ filepath = FCPath(filepath) if filepath.suffix.upper() == '.LBL': return filepath.read_text(encoding='latin-1') # Define regular expressions for label _COMMENT = re.compile(rb'(/\*.*?)\n') _DOUBLE = re.compile(rb'(".*?")') _SINGLE = re.compile(rb"('.*?')") _END = re.compile(rb'\n *END *\r?\n') # Open file for read; treat it as binary with filepath.open(mode='rb') as f: # pragma: no branch content = b'' end_of_file = False while not end_of_file: # pragma: no branch # Read more content from file old_len = len(content) content += f.read(chars) end_of_file = len(content) < old_len + chars # Replace comments with "xxx"; preserve content size and newlines parts = _COMMENT.split(content) parts[1::2] = [len(part) * b'x' + b'\n' for part in parts[1::2]] test = b''.join(parts) # Replace quoted text with "xxx"; preserve content size parts = _DOUBLE.split(test) parts[1::2] = [len(part) * b'x' for part in parts[1::2]] test = b''.join(parts) # Replace single-quoted text with 'xxx'; preserve content size parts = _SINGLE.split(test) parts[1::2] = [len(part) * b'x' for part in parts[1::2]] test = b''.join(parts) assert len(test) == len(content), 'quote replacement error' # Now it's safe to search for "END" match = _END.search(test) if match: content = content[:match.end()].replace(b'\r\n', b'\n') return content.decode('latin-1') # If not found, read more content and try again chars *= 2 # Check for a detached .LBL file for suffix in ('.lbl', '.LBL'): alt_filepath = filepath.with_suffix(suffix) if alt_filepath.exists(): return read_label(alt_filepath) raise SyntaxError(f'missing END statement in {filepath}')
[docs] def read_vax_binary_label(filepath): """Read an attached PDS3 label from a Vax binary file that uses variable-length records. Parameters: filepath (str, pathlib.Path, or filecache.FCPath): The path to the file. A detached label (ending in ".lbl" or ".LBL") is read using "stream" format; any other file is read assuming Vax variable-length format (in which the first two bytes of each record contain the length of the remaining record). If the file does not contain a PDS3 label, a detached label (with same path but ending in ".lbl" or ".LBL") is read instead. Returns: str: The content of the label as a single string with newline terminators. Raises: FileNotFoundError: If the label file is missing. """ filepath = FCPath(filepath) if filepath.suffix.upper() == '.LBL': return read_label(filepath) # Read from Vax-structured file (where first two bytes are the record length) ended = False with filepath.open(mode='rb') as f: recs = [] while True: header = f.read(2) # read two bytes if len(header) == 0: # at EOF, break break count = header[1] * 256 + header[0] # interpret bytes as LSB integer rec = f.read(count) # read record with this many bytes recs.append(rec) # append this record to content if rec.strip() == b'END': # on "END", we're done ended = True break if len(rec) % 2 == 1: # if the record length is odd... f.read(1) # ... skip the next byte if ended: content = b'\n'.join(recs) + b'\n' return content.decode('latin-1') for suffix in ('.lbl', '.LBL'): alt_filepath = filepath.with_suffix(suffix) if alt_filepath.exists(): return read_label(alt_filepath) raise SyntaxError(f'missing END statement in {filepath}')
[docs] def expand_structures(content, fmt_dirs=[], *, repairs=[], label_path=None): """Replace any ^STRUCTURE keywords in the label with the content of the associated ".FMT" files. Parameters: fmt_dirs (str, pathlib.Path, filecache.FCPath, or list, optional): One or more directory paths to search for the ".FMT" files. repairs (tuple or list[tuple]): One or more two-element tuples of the form (pattern, replacement), where the first item is a regular expression and the second is the string with which to replace it. These repair patterns are applied to the label content before it is parsed, and make it possible to repair known syntax errors. label_path (str, pathlib.Path, filecache.FCPath, optional): The path to the label file from which the content was obtained; if provided, the parent directory of this files is the first to be searched for .FMT files. Returns: str: The revised content string. Raises: FileNotFoundError: If a referenced .FMT file cannot be found in any of the directories specified. """ # Define key regular expressions _STRUCTURE = re.compile(r' *\^[A-Z0-9_]*STRUCTURE *= *["|\'](.*?)["|\'] *\n') _END = re.compile(r' *END *\n*$') # Obtain the list of directories to search if not isinstance(fmt_dirs, (list, tuple)): fmt_dirs = [fmt_dirs] fmt_dirs = [FCPath(dir) for dir in fmt_dirs] if label_path: fmt_dirs = [FCPath(label_path).parent] + fmt_dirs if not fmt_dirs: # if no path is provided, search the local default dir fmt_dirs = [FCPath('.')] # Replace ^STRUCTURE keywords, one by one... while (match := _STRUCTURE.search(content)): k0, k1 = match.span() fmt_name = match.group(1) # Find and read the .FMT file for fmt_dir in fmt_dirs: fmt_path = fmt_dir / fmt_name if fmt_path.exists(): break if not fmt_path.exists(): raise FileNotFoundError('file not found: ' + fmt_name) fmt_content = fmt_path.read_text(encoding='latin-1') # Don't include END from .FMT file if match := _END.search(fmt_content): fmt_content = fmt_content[:match.start()] # Repair content if necessary if isinstance(repairs, tuple): repairs = [repairs] for repair in repairs: fmt_content = re.sub(repair[0], repair[1], fmt_content) # Replace content = content[:k0] + fmt_content + content[k1:] return content
def _format_float(value): """Convert float to string with some cleanup.""" result = str(value) if result.endswith('.0'): result = result[:-1] if '.' not in result: result = result.replace('e', '.e').replace('E', '.E') if '.' not in result: result += '.' return result def _based_int(radix, digits): """The integer value associated with a based integer.""" value = 0 for c in digits: i = '0123456789ABCDEF'.index(c.upper()) value = value * radix + i return value def _is_identifier(text): if text != text.upper(): return False text = text.replace('_', '') if not text[:1].isalpha(): return False return text.isalnum() def _unique_key(name, dict_, dups=None): """This name if it is not in the dict_; otherwise with a numeric suffix appended to make it unique. """ if name not in dict_: return name indx = 2 while (key := name + '_' + str(indx)) in dict_: indx += 1 if dups is not None: dups.add(name) return key def _unwrap(text): """Remove indents and extra newlines inside paragraphs.""" # Strip trailing whitespace from each line parts = [t.rstrip() for t in text.split('\n')] # Delete leading blank lines while parts and not parts[0]: parts = parts[1:] if not parts: return '' # Strip indent of first line first = parts[0].lstrip() # Derive indent from subsequent non-empty strings indent = 9999 for part in parts[1:]: if part: indent = min(indent, len(part) - len(part.lstrip())) # Remove all indents parts = [first] + [part[indent:] for part in parts[1:]] # Put a newline in front of each residual indent for k, part in enumerate(parts): if part and part[0].isspace(): parts[k] = '\n' + part # An old-style newline at the end of the last line is unnecessary if parts[-1].endswith('\\n'): parts[-1] = parts[-1][:-2].rstrip() # An old-style newline at the end of a line forces a newline at the front of the next for k in range(len(parts) - 1): part = parts[k] if part.endswith('\\n'): parts[k] = parts[k][:-2].rstrip() if parts[k+1] and parts[k+1][0] != '\n': parts[k+1] = '\n' + parts[k+1] # Replace any remaining explicit newlines with the real thing for k, part in enumerate(parts): subparts = part.split('\\n') parts[k] = '\n'.join(subpart.rstrip() for subpart in subparts) # Merge paragraphs new_parts = parts[:1] for part in parts[1:]: if not part: new_parts.append('\n\n') elif part[0].isspace(): new_parts.append(part) elif new_parts[-1][-1].isspace(): new_parts.append(part) else: new_parts.append(' ' + part) result = ''.join(new_parts) # Never more than two blank lines together parts = re.split(r'\n\n+', result) result = '\n\n'.join(parts) return result.strip() ##########################################################################################