"""Module containing methods for parsing GEDCOM files.
"""
__all__ = ['GedcomReader', 'ParserError', 'CodecError', 'IntegrityError',
'guess_codec', 'GedcomLine']
import codecs
import io
import logging
import re
from typing import List, NamedTuple, Optional
from .detail.io import check_bom, guess_lineno, BinaryFileCR
from . import model
_log = logging.getLogger(__name__)
# records are bytes, regex is for bytes too
_re_GedcomLine = re.compile(br"""
^
[ ]*(?P<level>\d+) # integer level number
(?:[ ]*(?P<xref>@[A-Z-a-z0-9][^@]*@))? # optional @xref@
[ ]*(?P<tag>[A-Z-a-z0-9_]+) # tag name
(?:[ ](?P<value>.*))? # optional value
$
""", re.X)
[docs]class GedcomLine(NamedTuple):
"""Class representing single line in a GEDCOM file.
.. note::
Mostly for internal use by parser, most clients do not need to know
about this class.
Attributes
----------
level : `int`
xref_id : `str`, possibly empty or ``None``
tag : `str`, required, non-empty
value : `bytes`, possibly empty or ``None``
offset : `int`
"""
level: int
"""Record level number (`int`)"""
xref_id: Optional[str]
"""Reference for this record (`str` or ``None``)"""
tag: str
"""Tag name (`str`)"""
value: bytes
"""Record value (`bytes`)"""
offset: int
"""Record offset in a file (`int`)"""
[docs]class ParserError(Exception):
"""Class for exceptions raised for parsing errors.
"""
pass
[docs]class IntegrityError(Exception):
"""Class for exceptions raised for structural errors, e.g. when record
level nesting is inconsistent.
"""
pass
[docs]class CodecError(ParserError):
"""Class for exceptions raised for codec-related errors.
"""
pass
[docs]def guess_codec(file, errors="strict", require_char=False, warn=True):
"""Look at file contents and guess its correct encoding.
File must be open in binary mode and positioned at offset 0. If BOM
record is present then it is assumed to be UTF-8 or UTF-16 encoded
file. GEDCOM header is searched for CHAR record and encoding name
is extracted from it, if BOM record is present then CHAR record
must match BOM-defined encoding.
Parameters
----------
file
File object, must be open in binary mode.
errors : `str`, optional
Controls error handling behavior during string decoding, accepts same
values as standard `codecs.decode` method.
require_char : `bool`, optional
If ``True`` then exception is thrown if CHAR record is not found in a
header, if False and CHAR is not in the header then codec determined
from BOM or "gedcom" is returned.
warn : `bool`, optional
If True (default) then generate error/warning messages for illegal
encodings.
Returns
-------
codec_name : `str`
The name of the codec in this file.
bom_size : `int`
Size of the BOM record, 0 if no BOM record.
Raises
------
CodecError
Raised if codec name in file is unknown or when codec name in file
contradicts codec determined from BOM.
UnicodeDecodeError
Raised if codec fails to decode input lines and `errors` is set to
"strict" (default).
"""
# set of illegal but unambiguous encodings and their corresponding codecs
illegal_encodings = {
"windows-1250": "cp1250",
"windows-1251": "cp1251",
"cp1252": "cp1252",
"iso-8859-1": "iso8859-1",
"iso8859-1": "iso8859-1",
}
# set of ambiguous (and illegal) encodings
ambiguous_encodings = {
'ibmpc': 'cp437',
"ibm": "cp437",
"ibm-pc": "cp437",
"oem": "cp437",
"msdos": "cp850",
"ibm dos": "cp850",
"ms-dos": "cp850",
"ansi": "cp1252",
"windows": "cp1252",
"ibm_windows": "cp1252",
"ibm windows": "cp1252",
"iso8859": "iso8859-1",
"latin1": "iso8859-1",
"macintosh": "mac-roman",
}
illegal_encodings.update(ambiguous_encodings)
# full set of encodings, including legal ones
gedcom_char_to_codec = {"ansel": "gedcom"}
gedcom_char_to_codec.update(illegal_encodings)
# check BOM first
bom_codec = check_bom(file)
bom_size = file.tell()
codec = bom_codec or 'gedcom'
# scan header until CHAR or end of header
lineno = 0
while True:
lineno += 1
# this stops at '\n'
line = file.readline()
if not line:
raise IOError("Unexpected EOF while reading GEDCOM header")
# do not decode bytes to strings here, reason is that some
# stupid apps split CONC record at byte level (in middle of
# of multi-byte characters). This implies that we can only
# work with encodings that have ASCII as single-byte subset.
line = line.lstrip().rstrip(b"\r\n")
words = line.split()
if len(words) >= 2 and words[0] == b"0" and words[1] != b"HEAD":
# past header but have not seen CHAR
if require_char:
raise CodecError("GEDCOM header does not have CHAR record")
else:
break
elif len(words) >= 3 and words[0] == b"1" and words[1] == b"CHAR":
try:
enc = b" ".join(words[2:]).decode(codec, errors)
encoding = gedcom_char_to_codec.get(enc.lower(), enc.lower())
if enc.lower() in illegal_encodings and warn:
_log.error("Line %d: \"%s\" - \"%s\" is not a legal "
"character set or encoding.", lineno, line, enc)
if enc.lower() in ambiguous_encodings:
_log.warning("Character set (\"%s\") is ambiguous, it "
"will be interpreted as \"%s\"",
enc, encoding)
new_codec = codecs.lookup(encoding).name
except LookupError:
raise CodecError("Unknown codec name '{0}'".format(enc))
if bom_codec is None:
codec = new_codec
elif new_codec != bom_codec:
raise CodecError("CHAR codec {0} is different from BOM "
"codec {1}".format(new_codec, bom_codec))
break
return codec, bom_size
[docs]class GedcomReader:
"""Main interface for reading GEDCOM files.
Parameters
----------
file
File name or file object open in binary mode, file must be seekable.
encoding : `str`, optional
If ``None`` (default) then file is analyzed using `guess_codec()`
method to determine correct codec. Otherwise file is open using
specified codec.
errors : `str`, optional
Controls error handling behavior during string decoding, accepts same
values as standard `codecs.decode` method.
require_char : `bool`, optional
If True then exception is thrown if CHAR record is not found in a
header, if False and CHAR is not in the header then codec determined
from BOM or "gedcom" is used.
Notes
-----
Instance of this class is used to read and parse single GEDCOM file.
Records in GEDCOM file are transformed into instances of types defined in
`ged4py.model` module, either `ged4py.model.Record` class or one of its
sub-classes. Main method of access to the data in the file is by iterating
over level-0 records, optionally restricted by the tag name. The method
which does this is `GedcomReader.records0()`. Most commonly the code which
reads GEDCOM file at the top-level loop will look like this::
with GedcomReader(path) as parser:
# iterate over each INDI record in a file
for record in parser.records0("INDI"):
# do something with the record or navigate to other linked records
"""
def __init__(self, file, encoding=None, errors="strict",
require_char=False):
self._encoding = encoding
self._errors = errors
self._bom_size = 0
self._index0 = None # list of level=0 record positions
self._xref0 = None # maps xref_id to level=0 record position
self._header = None
self._dialect = None
# open the file
if hasattr(file, 'read'):
# assume it is a file already
if hasattr(file, 'seekable'):
# check that it supports seek()
if not file.seekable():
raise IOError("Input file does not support seek.")
self._file = file
else:
raw = io.FileIO(file)
self._file = io.BufferedReader(raw)
self._file = BinaryFileCR(self._file)
# check codec and BOM
try:
encoding, self._bom_size = guess_codec(self._file,
errors=self._errors,
require_char=require_char,
warn=self._encoding is None)
except Exception:
self._file.close()
raise
self._file.seek(self._bom_size)
if not self._encoding:
self._encoding = encoding
@property
def index0(self):
"""List of level=0 record positions and tag names (`list[(int, str)]`).
"""
if self._index0 is None:
self._init_index()
return self._index0
@property
def xref0(self):
"""Dictionary which maps xref_id to level=0 record position and
tag name (`dict[str, (int, str)]`).
"""
if self._xref0 is None:
self._init_index()
return self._xref0
@property
def header(self):
"""Header record (`ged4py.model.Record`).
"""
if self._index0 is None:
self._init_index()
return self._header
def _init_index(self):
_log.debug("in _init_index")
self._index0 = []
self._xref0 = {}
# scan whole file for level=0 records
for gline in self.GedcomLines(self._bom_size):
_log.debug(" _init_index gline: %s", gline)
if gline.level == 0:
self._index0.append((gline.offset, gline.tag))
if gline.xref_id:
self._xref0[gline.xref_id] = (gline.offset, gline.tag)
_log.debug(" _init_index gline: done proc")
if self._index0 and self._index0[0][1] == 'HEAD':
self._header = self.read_record(self._index0[0][0])
_log.debug("_init_index done")
@property
def dialect(self):
"""File dialect as one of `ged4py.model.Dialect` enums.
"""
if self._dialect is None:
self._dialect = model.Dialect.DEFAULT
if self.header:
source = self.header.sub_tag("SOUR")
if source:
if source.value == "MYHERITAGE":
self._dialect = model.Dialect.MYHERITAGE
elif source.value in ("ALTREE", "AgelongTree"):
self._dialect = model.Dialect.ALTREE
elif source.value == "ANCESTRIS":
self._dialect = model.Dialect.ANCESTRIS
return self._dialect
@dialect.setter
def dialect(self, value):
self._dialect = value
[docs] def GedcomLines(self, offset):
"""Generator method for *gedcom lines*.
Parameters
----------
offset : `int`
Position in the file to start reading.
Yields
------
line : `GedcomLine`
An object representing one line of GEDCOM file.
Raises
------
ParserError
Raised if lines have incorrect syntax.
Notes
-----
GEDCOM line grammar is defined in Chapter 1 of GEDCOM standard, it
consists of the level number, optional reference ID, tag name, and
optional value separated by spaces. Chaper 1 is pure grammar level,
it does not assign any semantics to tags or levels. Consequently
this method does not perform any operations on the lines other than
returning the lines in their order in file.
This method iterates over all lines in input file and converts each
line into `GedcomLine` class. It is an implementation detail used by
other methods, most clients will not need to use this method.
"""
self._file.seek(offset)
prev_gline: Optional[GedcomLine] = None
while True:
offset = self._file.tell()
line = self._file.readline() # stops at \n
if not line:
break
line = line.lstrip().rstrip(b"\r\n")
match = _re_GedcomLine.match(line)
if not match:
self._file.seek(offset)
lineno = guess_lineno(self._file)
line = line.decode(self._encoding, "ignore")
raise ParserError("Invalid syntax at line "
"{0}: `{1}'".format(lineno, line))
level = int(match.group('level'))
xref_id_bytes = match.group('xref')
xref_id: Optional[str]
if xref_id_bytes:
xref_id = xref_id_bytes.decode(self._encoding, self._errors)
else:
xref_id = None
tag = match.group('tag').decode(self._encoding, self._errors)
# simple structural integrity check
if prev_gline is not None:
if level - prev_gline.level > 1:
# nested levels should be incremental (+1)
self._file.seek(offset)
lineno = guess_lineno(self._file)
line = line.decode(self._encoding, "ignore")
raise IntegrityError("Structural integrity - "
"illegal level nesting at line "
"{0}: `{1}'".format(lineno, line))
if tag in ("CONT", "CONC"):
# CONT/CONC level must be +1 from preceding non-CONT/CONC
# record or the same as preceding CONT/CONC record
if ((prev_gline.tag in ("CONT", "CONC") and
level != prev_gline.level) or
(prev_gline.tag not in ("CONT", "CONC") and
level - prev_gline.level != 1)):
self._file.seek(offset)
lineno = guess_lineno(self._file)
line = line.decode(self._encoding, "ignore")
raise IntegrityError("Structural integrity - illegal "
"CONC/CONT nesting at line "
"{0}: `{1}'".format(lineno, line))
gline = GedcomLine(level=level,
xref_id=xref_id,
tag=tag,
value=match.group('value'),
offset=offset)
yield gline
prev_gline = gline
[docs] def records0(self, tag=None):
"""Iterator over level=0 records with given tag.
This is the main method of this class. Clients access data in GEDCOM
files by iterating over level=0 records and then navigating to
sub-records using the methods of the `~ged4py.model.Record` class.
Parameters
----------
tag : `str`, optional
If tag is ``None`` (default) then return all level=0 records,
otherwise return level=0 records with the given tag.
Yields
------
record : `~ged4py.model.Record`
Instances of `~ged4py.model.Record` or its subclasses.
"""
_log.debug("in records0")
for offset, xtag in self.index0:
_log.debug(" records0: offset: %s; xtag: %s", offset, xtag)
if tag is None or tag == xtag:
yield self.read_record(offset)
[docs] def read_record(self, offset):
"""Read next complete record from a file starting at given position.
Reads the record at given position and all its sub-records. Stops
reading at EOF or next record with the same or higher (smaller) level
number. File position after return from this method is not specified,
re-position file if you want to read other records.
This is mostly for internal use, regular clients don't need to use it.
Parameters
----------
offset : `int`
Position in the file to start reading.
Returns
-------
record : `~ged4py.model.Record` or ``None``
`model.Record` instance or None if offset points past EOF.
Raises
------
ParserError
Raised if `offsets` does not point to the beginning of a record or
for any parsing errors.
"""
_log.debug("in read_record(%s)", offset)
stack: List[Optional[model.Record]] = [] # stores per-level current records
reclevel: Optional[int] = None
for gline in self.GedcomLines(offset):
_log.debug(" read_record, gline: %s", gline)
level = gline.level
if reclevel is None:
# this is the first record, remember its level
reclevel = level
elif level <= reclevel:
# stop at the record of the same or higher (smaller) level
break
# All previously seen records at this level and below can
# be finalized now
for rec in reversed(stack[level:]):
# decode bytes value into string
if rec:
if rec.value is not None:
rec.value = rec.value.decode(self._encoding,
self._errors)
rec.freeze()
# _log.debug(" read_record, rec: %s", rec)
del stack[level + 1:]
# extend stack to fit this level (and make parent levels if needed)
stack.extend([None] * (level + 1 - len(stack)))
# make Record out of it (it can be updated later)
parent = stack[level - 1] if level > 0 else None
rec = self._make_record(parent, gline)
# store as current record at this level
stack[level] = rec
for rec in reversed(stack[reclevel:]):
if rec:
if rec.value is not None:
rec.value = rec.value.decode(self._encoding, self._errors)
rec.freeze()
_log.debug(" read_record, rec: %s", rec)
if stack:
assert reclevel is not None
return stack[reclevel]
else:
return None
def _make_record(self, parent, gline):
"""Process next record.
This method created new record from the line read from file if
needed and/or updates its parent record. If the parent record tag
is ``BLOB`` and new record tag is ``CONT`` then record is skipped
entirely and None is returned. Otherwise if new record tag is ``CONT``
or ``CONC`` its value is added to parent value. For all other tags
new record is made and it is added to parent sub_records attribute.
Parameters
----------
parent : `ged4py.model.Record`
Parent record of the new record
gline : `GedcomLine`
Current parsed line
Returns
-------
record : `ged4py.model.Record` or None
"""
if parent and gline.tag in ("CONT", "CONC"):
# concatenate, only for non-BLOBs
if parent.tag != "BLOB":
# have to be careful concatenating empty/None values
value = gline.value
if gline.tag == "CONT":
value = b"\n" + (value or b"")
if value is not None:
parent.value = (parent.value or b"") + value
return None
# avoid infinite cycle
dialect = model.Dialect.DEFAULT
if not (gline.level == 0 and gline.tag == "HEAD") and self._header:
dialect = self.dialect
rec = model.make_record(level=gline.level, xref_id=gline.xref_id,
tag=gline.tag, value=gline.value,
sub_records=[], offset=gline.offset,
dialect=dialect, parser=self)
# add to parent's sub-records list
if parent:
parent.sub_records.append(rec)
return rec
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self._file.close()