Source code for Gutenberg_Dataset

import logging
import os
import re
import time

from enum import Enum
from urllib.request import urlopen


[docs] class Gutenberg_Dataset: """A fuzzy, lightweight class to access, search and filter Project Gutenberg resources GutenbergLib by default uses a mirror's root URL. Alternatively, you can specify a local directory containing a Gutenberg mirror. That mirror directory needs to contain a GUTINDEX.ALL file and has typically many sub-directories `0` ,.. `n` . A mirror of project Gutenberg can be created by: .. code-block:: console #!/bin/bash rsync -zarv --dry-run --prune-empty-dirs --del --include="*/" --include='*.'{txt,pdf,ALL} --exclude="*" aleph.gutenberg.org::gutenberg ./gutenberg_mirror You can remove the PDF files, since they are currently not used, and need to review the `--dry-run` option. Note: :func:`~Gutenberg_Dataset.Gutenberg_Dataset.load_index` needs to be called before any other methods. :param root_url: url of Project Gutenberg or any mirror URL, or a local directory containing a Gutenberg mirror. :param cache_dir: path to a directory that will be used to cache the Gutenberg index and already downloaded texts. The cache directory is only used, if a remote Gutenberg URL and not a local mirror is used. """ def __init__( self, root_url="https://www.gutenberg.org/dirs", cache_dir="gutenberg" ): # old root, vanished: http://www.mirrorservice.org/sites/ftp.ibiblio.org/pub/docs/books/gutenberg self.log = logging.getLogger("GutenbergLib") self.root_url = root_url self.index = None self.NEAR = 2048 self.start_tokens = [ "*** START OF THIS PROJECT", "E-text prepared by", "This book was generously provided by the ", "*** START OF THIS PROJECT GUTENBERG", "START OF THE PROJECT GUTENBERG", ] self.near_start_tokens = [ "produced by ", "Produced by ", ", prepared by", "Transcriber's Note", "Transcriber's note:", "Anmerkungen zur Tanskription", "Distributed Proofreading Team", "offensichtliche Schreibfehler", "Inkonsistenzen in der Rechtschreibung", "Im Original", "Obvious printer errors", "spelling was kept", "ERRATA have been applied", "punctuation errors", "have been silently corrected", "changes to the text", "Transcriber note", "Transcriber Note", "_italic_", "Variable spelling", ] self.end_tokens = [ "End of the Project Gutenberg", "*** END OF THIS PROJECT", "***END OF THE PROJECT GUTENBER", "Ende dieses Projekt Gutenberg", "*** END OF THE PROJECT GUTENBERG", "End of Project Gutenberg", "Transcriber's Note", ] self.local_mirror = False if root_url[:4] != "http": if not os.path.exists(root_url): self.log.error( f"If root_url points to non-http URL, it must be an existing local directory containing a Gutenberg mirror: {root_url}" ) else: index_path = os.path.join(root_url, "GUTINDEX.ALL") if not os.path.exists(index_path): self.log.error( f"GUTINDEX.ALL not found in {root_url}, this is not a valid Gutenberg mirror" ) else: self.local_mirror = True self.cache_dir = None return if self.local_mirror is False: try: if not os.path.exists(cache_dir): os.makedirs(cache_dir) self.cache_dir = cache_dir except Exception as e: self.cache_dir = None self.log.error(f"Failed to create cache directory {cache_dir}, {e}") def _parse_record(self, record, verbose=True): """internal function to recreate some consistent record information from near-freestyle text""" rl = record.split("\n") white = str(chr(160)) + str(chr(9)) + " " # non-breaking space, TAB, and space ebook_no = "" while len(rl[0]) > 0 and rl[0][-1] in white: rl[0] = rl[0][:-1] while len(rl[0]) > 0 and not rl[0][-1] in white: ebook_no = rl[0][-1] + ebook_no rl[0] = rl[0][:-1] while len(rl[0]) > 0 and rl[0][-1] in white: rl[0] = rl[0][:-1] # Sanity check try: fa = re.findall(ebook_no, r"\A[0-9]+[A-C]\Z") except Exception as e: fa = None if verbose is True: self.log.warning(f"Failed to apply regex on >{ebook_no}<: {e}") if len(rl[0]) < 5 or fa is None or len(ebook_no) > 7: if verbose is True: print("-------------------------------------") print(record) print("- - - - - - - - - - - - - - - - - - -") print(f"Dodgy record: {rl[0]}") print(f" ebook-id: >{ebook_no}<") return None for i in range(len(rl)): rl[i] = rl[i].strip() p = 0 while p < len(rl) - 1: if len(rl[p + 1]) == 0: print(f"Invalid rec: {record}") p += 1 else: if rl[p + 1][0] != "[": rl[p] += " " + rl[p + 1] del rl[p + 1] if rl[p][-1] == "]": p += 1 else: p += 1 rec = {} l0 = rl[0].split(", by ") rec["title"] = l0[0] rec["ebook_id"] = ebook_no # if len(l0)>2: # print(f"Chaos title: {rl[0]}") if len(l0) > 1: rec["author"] = l0[-1] for r in rl[1:]: if r[0] != "[" or r[-1] != "]": if r[0] == "[": ind = r.rfind("]") if ind != -1: # print(f"Garbage trail {r}") r = r[: ind + 1] # print(f"Fixed: {r}") else: # print(f"Missing closing ] {r}") r += "]" # print(f"Fixed: {r}") if r[0] == "[" and r[-1] == "]": r = r[1:-1] i1 = r.find(":") if i1 == -1: r = r.replace("Author a.k.a.", "Author a.k.a.:") i1 = r.find(":") if i1 != -1: i2 = r[i1:].find(" ") + i1 else: i2 = -1 if i1 == -1 and i2 == -1: pass else: if i2 - i1 == 1: key = r[:i1] val = r[i2 + 1 :] if ( "[" in key or "]" in key or "[" in val or "]" in val or len(key) > 15 ): pass else: rec[key.strip().lower()] = val.strip() else: pass else: pass # print(f"Invalid attribut in {rl}::{r}") if len(rec) > 1: if "language" not in rec.keys(): rec["language"] = "English" return rec def _parse_index(self, lines): """internal function to parse the fuzzy text-based Gutenberg table of content""" class State(Enum): NONE = (1,) SYNC_START = (2,) SYNC_REC = (3,) END = 5 white = str(chr(160)) + str(chr(9)) + " " # non-breaking space, TAB, and space state = State.NONE start_token = "~ ~ ~ ~" stop_token = ["====="] end_token = "<==End" ignore_headers = ["TITLE and AUTHOR"] ignore_content = [ "Not in the Posted Archives", "human-read audio ebooks", "Audio:", ] empty_lines = 0 records = [] for line in lines: if line[: len(end_token)] == end_token: state = State.END break if state == State.NONE: if line[: len(start_token)] == start_token: state = State.SYNC_START empty_lines = 0 continue if state == State.SYNC_START: if len(line.strip()) == 0: empty_lines += 1 if empty_lines > 1: state = State.NONE continue else: stopped = False for stop in stop_token: if line[: len(stop)] == stop: stopped = True break if stopped is True: state = State.NONE empty_lines = 0 continue ignore = False for header in ignore_headers: if line[: len(header)] == header: empty_lines = 0 ignore = True for token in ignore_content: if token in line: empty_lines = 0 ignore = True if ignore is True: continue rec = line state = State.SYNC_REC continue if state == State.SYNC_REC: if len(line.strip()) == 0 or line[0] not in white: if len(records) < 10: parsed_rec = self._parse_record(rec, verbose=True) else: parsed_rec = self._parse_record(rec, verbose=False) if parsed_rec is not None: records.append(parsed_rec) empty_lines = 1 if len(line.strip()) == 0: state = State.SYNC_START continue else: rec = line continue rec = rec + "\n" + line return records
[docs] def load_index(self, cache=True, cache_expire_days=30): """This function loads the Gutenberg record index, either from cache, or from a website This should be the first method being used, since many other methods rely on the index being loaded. :param cache: default `True`, use the cache directory to cache both index and text files. Index expires after `cache_expire_days`, text files never expire. Should *NOT* be set to `False` in order to prevent unnecessary re-downloading. :param cache_expire_days: Number of days after which the index is re-downloaded. """ raw_index = None if self.local_mirror is False: if self.cache_dir is None: self.log.error("Cannot cache library index, no valid cache directory.") return False ts_file = os.path.join(self.cache_dir, "timestamp") cache_file = os.path.join(self.cache_dir, "gutenberg_index") expired = True read_from_cache = False if os.path.isfile(ts_file) and os.path.isfile(cache_file): try: with open(ts_file, "r") as f: ts = float(f.read()) if time.time() - ts < cache_expire_days * 24 * 3600: expired = False read_from_cache = True self.log.debug("Cache timestamp read.") else: self.log.debug( "Cache for Gutenberg-index is expired, reloading from web." ) except Exception as e: self.log.warning( f"Failed to read cache timestamp ({e}), reloading from web." ) if expired is False and os.path.isfile(cache_file): try: with open(cache_file, "r") as f: raw_index = f.read() self.log.debug( f"Gutenberg index read from local cache: {cache_file}" ) except Exception as e: expired = True self.log.warning( f"Failed to read cached index ({e}), reloading from web." ) if expired is True: index_url = self.root_url + "/GUTINDEX.ALL" try: raw_index = urlopen(index_url).read().decode("utf-8") if raw_index[0] == "\ufeff": # Ignore BOM raw_index = raw_index[1:] raw_index = raw_index.replace("\r", "") self.log.debug(f"Gutenberg index read from {index_url}") except Exception as e: self.log.error( f"Failed to download Gutenberg index from {index_url}, {e}" ) return False if cache is True and read_from_cache is False: try: with open(ts_file, "w") as f: f.write(str(time.time())) self.log.debug("Wrote read cache timestamp.") except Exception as e: self.log.error(f"Failed to write cache timestamp to {ts_file}, {e}") try: with open(cache_file, "w") as f: f.write(raw_index) self.log.debug("Wrote read cached index.") except Exception as e: self.log.error(f"Failed to write cached index to {cache_file}, {e}") else: index_file = os.path.join(self.root_url, "GUTINDEX.ALL") try: with open(index_file, "r") as f: raw_index = f.read() if raw_index[0] == "\ufeff": # Ignore BOM raw_index = raw_index[1:] raw_index = raw_index.replace("\r", "") self.log.debug( f"Gutenberg index read from local mirror: {index_file}" ) except Exception as e: self.log.error( f"Failed to read Gutenberg index from local mirror: {index_file}, {e}" ) return lines = raw_index.split("\n") self.records = self._parse_index(lines)
[docs] def load_book(self, ebook_id): """get text of an ebook from Gutenberg by ebook_id This function returns the unfiltered raw text including all Gutenberg headers and footers. Use :func:`~Gutenberg_Dataset.Gutenberg_Dataset.get_book` to retrieve a dictionary with metadata and filtered text. :param ebook_id: Gutenberg id (Note: string, since this sometimes contains a character!) :returns: book text as string, unfiltered. Can be filtered with :func:`~Gutenberg_Dataset.Gutenberg_Dataset.filter_text` """ txt, dl, val = self._load_book_ex(ebook_id) if val is True: return txt, dl else: return None, dl
def _read_download(self, filenames, path_stub, cache_name): """Internal function to read ebook from cache or download it.""" cache_file = None downloaded = False if self.cache_dir is not None: cache_file = os.path.join(self.cache_dir, cache_name) if os.path.isfile(cache_file): try: with open(cache_file, "r") as f: data = f.read() self.log.debug(f"Book read from cache at {cache_file}") downloaded = False return data, None, downloaded except Exception as e: self.log.error(f"Failed to read cached file {cache_file}: {e}") data = None file_url = None for filename, encoding in filenames: file_url = self.root_url + path_stub + filename if self.local_mirror is False: try: if encoding != "bin": data = urlopen(file_url).read().decode(encoding) else: data = urlopen(file_url).read() self.log.debug(f"Book read from {file_url}") downloaded = True break except Exception as e: self.log.debug(f"Failed to download {file_url}, {e}") else: try: if encoding != "bin": with open(file_url, "r", encoding=encoding) as f: data = f.read() else: with open(file_url, "rb") as f: data = f.read() self.log.debug(f"Book read from local mirror at {file_url}") downloaded = False break except Exception as e: self.log.debug(f"Failed to read local mirror at {file_url}, {e}") return data, cache_file, downloaded def _load_book_ex(self, ebook_id): """Internal function to get text of an ebook from Gutenberg by ebook_id with remote download information This function returns the unfiltered raw text including all Gutenberg headers and footers and a boolean flag indicating with 'True', if the book was downloaded from a remote source, 'False' indicates cached book retrieval without remote access. The flag can be used to control or limit the number of books downloaded from the remote source. Use :func:`~Gutenberg_Dataset.Gutenberg_Dataset.get_book` to retrieve a dictionary with metadata and filtered text. :param ebook_id: Gutenberg id (Note: string, since this sometimes contains a character!) :returns: tuple: book text as string, unfiltered, and a flag indicating with 'True' if book was downloaded from remote, validity flag, 'True' indicates valid text. """ if ebook_id is None or len(ebook_id) == 0: self.log.error("No ebook_id given.") return None, None, None if ebook_id[-1] == "C": ebook_id = ebook_id[:-1] path_stub = "" downloaded = False valid = False for i in range(len(ebook_id) - 1): path_stub += "/" + ebook_id[i] path_stub += "/" + ebook_id + "/" filenames = [ (ebook_id + "-0.txt", "utf-8"), (ebook_id + ".txt", "utf-8"), (ebook_id + "-8.txt", "latin1"), (ebook_id + ".txt", "latin1"), ] cache_name = ebook_id + ".txt" data, cache_file, downloaded = self._read_download( filenames, path_stub, cache_name ) if data is not None: if data[0] == "\ufeff": # Ignore BOM data = data[1:] data = data.replace("\r", "") valid = True else: filenames = [(ebook_id + "-pdf.pdf", "bin")] cache_name = ebook_id + ".pdf" data, cache_file, downloaded = self._read_download( filenames, path_stub, cache_name ) if data is not None: self.log.error( f"Ebook {cache_name} is only available in PDF format, this is not supported." ) else: self.log.warning(f"Failed to download {filenames}, skipping book.") return None, downloaded, False if cache_file is not None: try: with open(cache_file, "w") as f: f.write(data) except Exception as e: self.log.error(f"Failed to cache file {cache_file}: {e}") return data, downloaded, valid
[docs] def filter_text( self, book_text, add_start_tokens=None, add_near_start_tokens=None, add_end_tokens=None, ): """Heuristically remove header and trailer texts not part of the actual books Unfortunatelly, formatting of Gutenberg books is an unbelievable mess. Using lists of tokens `self.start_tokens` (indicating the start of the actual book text), `self.near_start_tokens` (indicating possibly ambiguous tokens near a `start_tokens` token, further narrowing the start of text), and `self.end_tokens` (indicating the end of the book text), this function tries to find the start and end of the book text. The user can either extend the lists of class member tokens, of provide temporary additional tokens as parameter to this function. The list of `start_tokens` contains only tokens that are always significant as being part of header-cruft (e.g. 'START OF THIS GUTENBERG'). `near_start_tokens` are tokens that might be ambiguous, but are still part of the header-cruft, (e.g. 'produced by'). `near_start_tokens` are only used, if they are within `self.NEAR` bytes to the latest `start_tokens` token, to heuristically prevent false positives. *Note:* Use logging via `logging.basicConfig(level=logging.DEBUG)` to analyze the filtering process. :param book_text: text of the book (string) :param add_start_tokens: additional start tokens (list of strings) :param add_near_start_tokens: additional near start tokens (list of strings) :param add_end_tokens: additional end tokens (list of strings) :returns: filtered text (string) """ start_tokens = self.start_tokens if add_start_tokens is not None: start_tokens.extend(add_start_tokens) near_start_tokens = self.near_start_tokens if add_near_start_tokens is not None: near_start_tokens.extend(add_near_start_tokens) end_tokens = self.end_tokens if add_end_tokens is not None: end_tokens.extend(add_end_tokens) if book_text is None: self.log.warning("Filter: book text is None, returning None") return None blen = len(book_text) pstart = 0 for token in start_tokens: pos = book_text.find(token) if pos > pstart: pstart = pos self.log.debug(f"Start-token [{token}] found at position {pos}") if pstart > 0: pos = book_text[pstart:].find("\n\n") if pos >= 0 and pos <= self.NEAR: pos += pstart while book_text[pos] == "\n": pos += 1 # eof?! pstart = pos if pstart > blen / 2: self.log.warning("Preamble is taking more than half of the book!") new_book = book_text[pstart:] xpos = -1 for token in near_start_tokens: pos = new_book.find(token) if pos >= 0 and pos <= self.NEAR: self.log.debug(f"Near-Start-token [{token}] found at position {pos}") if pos > xpos: xpos = pos if xpos > -1: pos2 = new_book[xpos:].find("\n\n") if pos2 <= self.NEAR and pos2 > 0: self.log.debug(f"Trying extra skipping (2) for {pos2}...") while new_book[xpos + pos2] == "\n": pos2 += 1 new_book = new_book[xpos + pos2 :] self.log.debug(f"Additionally shortened start by {xpos+pos2} chars") else: pos2 = new_book[xpos:].find("\n") if pos2 <= self.NEAR and pos2 > 0: self.log.debug(f"Trying extra skipping (3) for {pos2}...") while new_book[xpos + pos2] == "\n": pos2 += 1 new_book = new_book[xpos + pos2 :] self.log.debug( f"Additionally shortened start by {xpos+pos2}, {xpos}+{pos2} chars" ) else: pos2 = 0 new_book = new_book[xpos + pos2 :] pend = len(new_book) for token in end_tokens: pos = new_book.find(token) if pos != -1 and pos < pend: self.log.debug(f"End-token [{token}] found at pos {pos}") pend = pos if pend < len(new_book): pos = new_book[:pend].rfind("\n\n") if pos > 0: while new_book[pos] == "\n": pos -= 1 # eof?! pend = pos + 1 else: self.log.debug("No end token found!") if pend < len(new_book) / 2: self.log.debug("End-text is taking more than half of the book!") new_book = new_book[:pend] return new_book
[docs] def find_keywords(self, *search_keys): """Search of an arbitrary number of keywords in a book record *Note:* :func:`~Gutenberg_Dataset.Gutenberg_Dataset.load_index` needs to be called once before this function can be used. :returns: list of records that contain all keywords in any field. """ frecs = [] for rec in self.records: found = True for sk in search_keys: subkey = False for key in rec.keys(): if sk.lower() in key.lower() or sk.lower() in rec[key].lower(): subkey = True break if subkey is False: found = False break if found is True: frecs += [rec] return frecs
[docs] def search(self, search_dict): """Search for book record with key specific key values For a list of valid keys, use `get_record_keys()` Standard keys are: `ebook_id`, `author`, `language`, `title` *Note:* :func:`~Gutenberg_Dataset.Gutenberg_Dataset.load_index` needs to be called once before this function can be used. Example: `search({"title": ["philosoph","phenomen","physic","hermeneu","logic"], "language":"english"})` Find all books whose titles contain at least one of the keywords, language english. Search keys can either be search for a single keyword (e.g. english), or an array of keywords. :returns: list of records""" if not hasattr(self, "records") or self.records is None: self.log.debug("Index not loaded, trying to load...") self.load_index() frecs = [] for rec in self.records: found = True for sk in search_dict: if sk not in rec: found = False break else: skl = search_dict[sk] if not isinstance(skl, list): skl = [skl] nf = 0 for skli in skl: if skli.lower() in rec[sk].lower(): nf = nf + 1 if nf == 0: found = False break if found is True: frecs += [rec] return frecs
[docs] def insert_book_texts(self, search_dict, download_count_limit=20, skip_ids=[]): """Inserts book texts into the records returned by :func:`~Gutenberg_Dataset.Gutenberg_Dataset.search`. In order to prevent the download of too many books, the download count limit is set to `download_count_limit`. Downloaded books are cached and cached books are not counted towards the download count limit. Calling this function again will download books that have not been downloaded yet. The filtered book content is inserted into the dictionary with the key `text`. :param search_dict: search array of dictionaries that at least contain the key `ebook_id`. :param download_count_limit: maximum number of books to download, if no local mirror is used. No limits apply for local mirrors. :param skip_ids: list of ebook_ids (string format!) to skip downloading. :returns: list of records including filtered book text-based in the `text` field. """ dls = 0 delete_ids = [] for i in range(0, len(search_dict)): if search_dict[i]["ebook_id"] in skip_ids: self.log.debug( f"Skipping id={search_dict[i]['ebook_id']}, {search_dict[i]['title']}" ) # delete entry from search_dict delete_ids.append(i) continue self.log.debug( f"Getting id={search_dict[i]['ebook_id']}, {search_dict[i]['title']}" ) bt, dl, val = self._load_book_ex(search_dict[i]["ebook_id"]) if bt is None or val is False: if val is False: self.log.warning( f"Download of book {search_dict[i]['ebook_id']}, {search_dict[i]['title']}: invalid format!" ) else: self.log.error( f"Download of book {search_dict[i]['ebook_id']}, {search_dict[i]['title']} failed!" ) continue search_dict[i]["text"] = self.filter_text(bt) if dl is True and self.local_mirror is False: dls += 1 if dls > download_count_limit: self.log.error( f"Download limit reached ({download_count_limit}), stopping download..." ) break # reverse delete_ids to avoid index shifting for i in reversed(delete_ids): del search_dict[i] return search_dict
[docs] def get_book(self, ebook_id: str): """Get a book record metadata and filtered text by its ebook_id This function returns a dictionary with metadata and filtered text. Use :func:`~Gutenberg_Dataset.Gutenberg_Dataset.load_book` to get the raw unfiltered text. *Note:* :func:`~Gutenberg_Dataset.Gutenberg_Dataset.load_index` needs to be called once before this function can be used. :param ebook_id: ebook_id (String, since some IDs contain letters) of the book to be retrieved :returns: book record (dictionary with metadata and filtered text) """ for rec in self.records: if rec["ebook_id"] == ebook_id: text, _, valid = self._load_book_ex(ebook_id) if text is None or valid is False: self.log.Error(f"Download of book {ebook_id} failed!") return None rec["text"] = self.filter_text(text) return rec return None
[docs] def get_record_keys(self): """Get a list of all keys that are used within records. Standard keys are: `ebook_id`, `author`, `language`, `title`. *Note:* :func:`~Gutenberg_Dataset.Gutenberg_Dataset.load_index` needs to be called once before this function can be used. :returns: list of all different keys that are somehow used.""" rks = [] for r in self.records: rks = set(list(rks) + list(r.keys())) return rks
[docs] def get_unique_record_values(self, key): """Get a list of all unique values a given keys has for all records. *Note:* :func:`~Gutenberg_Dataset.Gutenberg_Dataset.load_index` needs to be called once before this function can be used. Example: `get_unique_records_values('language')` returns all languages in Gutenberg. :param key: key to search for. :returns: list of all unique values for a given key. """ uv = [] if key not in self.get_record_keys(): self.log.warning(f"{key} is not a key used in any record!") return None for r in self.records: if key in r: uv = set(list(uv) + [r[key]]) uv = sorted(uv) return uv