Source code for calibre.ebooks.oeb.polish.container

#!/usr/bin/env python
# License: GPLv3 Copyright: 2013, Kovid Goyal <kovid at kovidgoyal.net>


import errno
import hashlib
import logging
import os
import re
import shutil
import sys
import unicodedata
import uuid
from collections import defaultdict
from io import BytesIO
from itertools import count

from css_parser import getUrls, replaceUrls

from calibre import CurrentDir, walk
from calibre.constants import iswindows
from calibre.customize.ui import plugin_for_input_format, plugin_for_output_format
from calibre.ebooks import escape_xpath_attr
from calibre.ebooks.chardet import xml_to_unicode
from calibre.ebooks.conversion.plugins.epub_input import ADOBE_OBFUSCATION, IDPF_OBFUSCATION, decrypt_font_data
from calibre.ebooks.conversion.preprocess import CSSPreProcessor as cssp
from calibre.ebooks.conversion.preprocess import HTMLPreProcessor
from calibre.ebooks.metadata.opf3 import CALIBRE_PREFIX, ensure_prefix, items_with_property, read_prefixes
from calibre.ebooks.metadata.utils import parse_opf_version
from calibre.ebooks.mobi import MobiError
from calibre.ebooks.mobi.reader.headers import MetadataHeader
from calibre.ebooks.oeb.base import (
    DC11_NS,
    OEB_DOCS,
    OEB_STYLES,
    OPF,
    OPF2_NS,
    Manifest,
    itercsslinks,
    iterlinks,
    rewrite_links,
    serialize,
    urlquote,
    urlunquote,
)
from calibre.ebooks.oeb.parse_utils import NotHTML, parse_html
from calibre.ebooks.oeb.polish.errors import DRMError, InvalidBook
from calibre.ebooks.oeb.polish.parsing import parse as parse_html_tweak
from calibre.ebooks.oeb.polish.utils import OEB_FONTS, CommentFinder, PositionFinder, adjust_mime_for_epub, guess_type, parse_css
from calibre.ptempfile import PersistentTemporaryDirectory, PersistentTemporaryFile
from calibre.utils.filenames import hardlink_file, nlinks_file, retry_on_fail
from calibre.utils.ipc.simple_worker import WorkerError, fork_job
from calibre.utils.logging import default_log
from calibre.utils.xml_parse import safe_xml_fromstring
from calibre.utils.zipfile import ZipFile
from polyglot.builtins import iteritems
from polyglot.urllib import urlparse

exists, join, relpath = os.path.exists, os.path.join, os.path.relpath
OPF_NAMESPACES = {'opf':OPF2_NS, 'dc':DC11_NS}
null = object()
OEB_FONTS  # for plugin compat


class CSSPreProcessor(cssp):

    def __call__(self, data):
        return self.MS_PAT.sub(self.ms_sub, data)


def clone_dir(src, dest):
    ' Clone a folder using hard links for the files, dest must already exist '
    for x in os.listdir(src):
        dpath = os.path.join(dest, x)
        spath = os.path.join(src, x)
        if os.path.isdir(spath):
            os.mkdir(dpath)
            clone_dir(spath, dpath)
        else:
            try:
                hardlink_file(spath, dpath)
            except:
                shutil.copy2(spath, dpath)


def clone_container(container, dest_dir):
    ' Efficiently clone a container using hard links '
    dest_dir = os.path.abspath(os.path.realpath(dest_dir))
    clone_data = container.clone_data(dest_dir)
    cls = type(container)
    if cls is Container:
        return cls(None, None, container.log, clone_data=clone_data)
    return cls(None, container.log, clone_data=clone_data)


def name_to_abspath(name, root):
    return os.path.abspath(join(root, *name.split('/')))


def abspath_to_name(path, root):
    return relpath(os.path.abspath(path), root).replace(os.sep, '/')


def name_to_href(name, root, base=None, quote=urlquote):
    fullpath = name_to_abspath(name, root)
    basepath = root if base is None else os.path.dirname(name_to_abspath(base, root))
    path = relpath(fullpath, basepath).replace(os.sep, '/')
    return quote(path)


def href_to_name(href, root, base=None):
    base = root if base is None else os.path.dirname(name_to_abspath(base, root))
    try:
        purl = urlparse(href)
    except ValueError:
        return None
    if purl.scheme or not purl.path:
        return None
    href = urlunquote(purl.path)
    if iswindows and ':' in href:
        # path manipulations on windows fail for paths with : in them, so we
        # assume all such paths are invalid/absolute paths.
        return None
    fullpath = os.path.join(base, *href.split('/'))
    try:
        return unicodedata.normalize('NFC', abspath_to_name(fullpath, root))
    except ValueError:
        return None


def seconds_to_timestamp(duration: float) -> str:
    seconds = int(duration)
    float_part = int((duration - seconds) * 1000)
    hours = seconds // 3600
    minutes = (seconds % 3600) // 60
    seconds = seconds % 60
    ans = f'{hours:02d}:{minutes:02d}:{seconds:02d}'
    if float_part:
        ans += f'.{float_part}'
    return ans


class ContainerBase:  # {{{
    '''
    A base class that implements just the parsing methods. Useful to create
    virtual containers for testing.
    '''

    #: The mode used to parse HTML and CSS (polishing uses tweak_mode=False and the editor uses tweak_mode=True)
    tweak_mode = False

    def __init__(self, log):
        self.log = log
        self.parsed_cache = {}
        self.mime_map = {}
        self.encoding_map = {}
        self.html_preprocessor = HTMLPreProcessor()
        self.css_preprocessor = CSSPreProcessor()

    def guess_type(self, name):
        ' Return the expected mimetype for the specified file name based on its extension. '
        return adjust_mime_for_epub(filename=name, opf_version=self.opf_version_parsed)

    def decode(self, data, normalize_to_nfc=True):
        """
        Automatically decode ``data`` into a ``unicode`` object.

        :param normalize_to_nfc: Normalize returned unicode to the NFC normal form as is required by both the EPUB and AZW3 formats.
        """
        def fix_data(d):
            return d.replace('\r\n', '\n').replace('\r', '\n')
        if isinstance(data, str):
            return fix_data(data)
        bom_enc = None
        if data[:4] in {b'\0\0\xfe\xff', b'\xff\xfe\0\0'}:
            bom_enc = {b'\0\0\xfe\xff':'utf-32-be',
                       b'\xff\xfe\0\0':'utf-32-le'}[data[:4]]
            data = data[4:]
        elif data[:2] in {b'\xff\xfe', b'\xfe\xff'}:
            bom_enc = {b'\xff\xfe':'utf-16-le', b'\xfe\xff':'utf-16-be'}[data[:2]]
            data = data[2:]
        elif data[:3] == b'\xef\xbb\xbf':
            bom_enc = 'utf-8'
            data = data[3:]
        if bom_enc is not None:
            try:
                self.used_encoding = bom_enc
                return fix_data(data.decode(bom_enc))
            except UnicodeDecodeError:
                pass
        try:
            self.used_encoding = 'utf-8'
            return fix_data(data.decode('utf-8'))
        except UnicodeDecodeError:
            pass
        data, self.used_encoding = xml_to_unicode(data)
        if normalize_to_nfc:
            data = unicodedata.normalize('NFC', data)
        return fix_data(data)

    def parse_xml(self, data):
        data, self.used_encoding = xml_to_unicode(
            data, strip_encoding_pats=True, assume_utf8=True, resolve_entities=True)
        data = unicodedata.normalize('NFC', data)
        return safe_xml_fromstring(data)

    def parse_xhtml(self, data, fname='<string>', force_html5_parse=False):
        if self.tweak_mode:
            return parse_html_tweak(data, log=self.log, decoder=self.decode, force_html5_parse=force_html5_parse)
        else:
            try:
                return parse_html(
                    data, log=self.log, decoder=self.decode,
                    preprocessor=self.html_preprocessor, filename=fname,
                    non_html_file_tags={'ncx'})
            except NotHTML:
                return self.parse_xml(data)

    def parse_css(self, data, fname='<string>', is_declaration=False):
        return parse_css(data, fname=fname, is_declaration=is_declaration, decode=self.decode, log_level=logging.WARNING,
                         css_preprocessor=(None if self.tweak_mode else self.css_preprocessor))
# }}}


[docs] class Container(ContainerBase): # {{{ ''' A container represents an open e-book as a folder full of files and an OPF file. There are two important concepts: * The root folder. This is the base of the e-book. All the e-books files are inside this folder or in its sub-folders. * Names: These are paths to the books' files relative to the root folder. They always contain POSIX separators and are unquoted. They can be thought of as canonical identifiers for files in the book. Most methods on the container object work with names. Names are always in the NFC Unicode normal form. * Clones: the container object supports efficient on-disk cloning, which is used to implement checkpoints in the e-book editor. In order to make this work, you should never access files on the filesystem directly. Instead, use :meth:`raw_data` or :meth:`open` to read/write to component files in the book. When converting between hrefs and names use the methods provided by this class, they assume all hrefs are quoted. ''' #: The type of book (epub for EPUB files and azw3 for AZW3 files) book_type = 'oeb' #: If this container represents an unzipped book (a directory) is_dir = False SUPPORTS_TITLEPAGES = True SUPPORTS_FILENAMES = True @property def book_type_for_display(self): return self.book_type.upper() def __init__(self, rootpath, opfpath, log, clone_data=None): ContainerBase.__init__(self, log) self.root = clone_data['root'] if clone_data is not None else os.path.abspath(rootpath) self.name_path_map = {} self.dirtied = set() self.pretty_print = set() self.cloned = False self.cache_names = ('parsed_cache', 'mime_map', 'name_path_map', 'encoding_map', 'dirtied', 'pretty_print') self.href_to_name_cache = {} if clone_data is not None: self.cloned = True for x in ('name_path_map', 'opf_name', 'mime_map', 'pretty_print', 'encoding_map', 'tweak_mode'): setattr(self, x, clone_data[x]) self.opf_dir = os.path.dirname(self.name_path_map[self.opf_name]) return # Map of relative paths with '/' separators from root of unzipped ePub # to absolute paths on filesystem with os-specific separators opfpath = os.path.abspath(os.path.realpath(opfpath)) all_opf_files = [] for dirpath, _dirnames, filenames in os.walk(self.root): for f in filenames: path = join(dirpath, f) name = self.abspath_to_name(path) self.name_path_map[name] = path self.mime_map[name] = guess_type(path) # Special case if we have stumbled onto the opf if path == opfpath: self.opf_name = name self.opf_dir = os.path.dirname(path) self.mime_map[name] = guess_type('a.opf') if path.lower().endswith('.opf'): all_opf_files.append((name, os.path.dirname(path))) if not hasattr(self, 'opf_name') and all_opf_files: self.opf_name, self.opf_dir = all_opf_files[0] self.mime_map[self.opf_name] = guess_type('a.opf') if not hasattr(self, 'opf_name'): raise InvalidBook('Could not locate opf file: %r'%opfpath) # Update mime map with data from the OPF self.refresh_mime_map() def refresh_mime_map(self): for item in self.opf_xpath('//opf:manifest/opf:item[@href and @media-type]'): href = item.get('href') try: name = self.href_to_name(href, self.opf_name) except ValueError: continue # special filenames such as CON on windows cause relpath to fail mt = item.get('media-type') if name in self.mime_map and name != self.opf_name and mt: # some epubs include the opf in the manifest with an incorrect mime type self.mime_map[name] = mt def data_for_clone(self, dest_dir=None): dest_dir = dest_dir or self.root return { 'root': dest_dir, 'opf_name': self.opf_name, 'mime_map': self.mime_map.copy(), 'pretty_print': set(self.pretty_print), 'encoding_map': self.encoding_map.copy(), 'tweak_mode': self.tweak_mode, 'name_path_map': { name:os.path.join(dest_dir, os.path.relpath(path, self.root)) for name, path in iteritems(self.name_path_map)} } def clone_data(self, dest_dir): Container.commit(self, keep_parsed=False) self.cloned = True clone_dir(self.root, dest_dir) return self.data_for_clone(dest_dir)
[docs] def add_name_to_manifest(self, name, process_manifest_item=None): ' Add an entry to the manifest for a file with the specified name. Returns the manifest id. ' all_ids = {x.get('id') for x in self.opf_xpath('//*[@id]')} c = 0 item_id = 'id' while item_id in all_ids: c += 1 item_id = 'id' + '%d'%c manifest = self.opf_xpath('//opf:manifest')[0] href = self.name_to_href(name, self.opf_name) item = manifest.makeelement(OPF('item'), id=item_id, href=href) item.set('media-type', self.mime_map[name]) self.insert_into_xml(manifest, item) if process_manifest_item is not None: process_manifest_item(item) self.dirty(self.opf_name) return item_id
[docs] def manifest_has_name(self, name): ''' Return True if the manifest has an entry corresponding to name ''' all_names = {self.href_to_name(x.get('href'), self.opf_name) for x in self.opf_xpath('//opf:manifest/opf:item[@href]')} return name in all_names
[docs] def make_name_unique(self, name): ''' Ensure that `name` does not already exist in this book. If it does, return a modified version that does not exist. ''' counter = count() while self.has_name_case_insensitive(name) or self.manifest_has_name(name): c = next(counter) + 1 base, ext = name.rpartition('.')[::2] if c > 1: base = base.rpartition('-')[0] name = '%s-%d.%s' % (base, c, ext) return name
[docs] def add_file(self, name, data, media_type=None, spine_index=None, modify_name_if_needed=False, process_manifest_item=None): ''' Add a file to this container. Entries for the file are automatically created in the OPF manifest and spine (if the file is a text document) ''' if '..' in name: raise ValueError('Names are not allowed to have .. in them') href = self.name_to_href(name, self.opf_name) if self.has_name_case_insensitive(name) or self.manifest_has_name(name): if not modify_name_if_needed: raise ValueError(('A file with the name %s already exists' % name) if self.has_name_case_insensitive(name) else ('An item with the href %s already exists in the manifest' % href)) name = self.make_name_unique(name) href = self.name_to_href(name, self.opf_name) path = self.name_to_abspath(name) base = os.path.dirname(path) if not os.path.exists(base): os.makedirs(base) with open(path, 'wb') as f: if hasattr(data, 'read'): shutil.copyfileobj(data, f) else: f.write(data) mt = media_type or self.guess_type(name) self.name_path_map[name] = path self.mime_map[name] = mt if self.ok_to_be_unmanifested(name): return name item_id = self.add_name_to_manifest(name, process_manifest_item=process_manifest_item) if mt in OEB_DOCS: manifest = self.opf_xpath('//opf:manifest')[0] spine = self.opf_xpath('//opf:spine')[0] si = manifest.makeelement(OPF('itemref'), idref=item_id) self.insert_into_xml(spine, si, index=spine_index) return name
[docs] def rename(self, current_name, new_name): ''' Renames a file from current_name to new_name. It automatically rebases all links inside the file if the folder the file is in changes. Note however, that links are not updated in the other files that could reference this file. This is for performance, such updates should be done once, in bulk. ''' if current_name in self.names_that_must_not_be_changed: raise ValueError('Renaming of %s is not allowed' % current_name) if self.exists(new_name) and (new_name == current_name or new_name.lower() != current_name.lower()): # The destination exists and does not differ from the current name only by case raise ValueError(f'Cannot rename {current_name} to {new_name} as {new_name} already exists') new_path = self.name_to_abspath(new_name) base = os.path.dirname(new_path) if os.path.isfile(base): raise ValueError(f'Cannot rename {current_name} to {new_name} as {base} is a file') if not os.path.exists(base): os.makedirs(base) old_path = parent_dir = self.name_to_abspath(current_name) self.commit_item(current_name) os.rename(old_path, new_path) # Remove empty directories while parent_dir: parent_dir = os.path.dirname(parent_dir) try: os.rmdir(parent_dir) except OSError: break for x in ('mime_map', 'encoding_map'): x = getattr(self, x) if current_name in x: x[new_name] = x[current_name] self.name_path_map[new_name] = new_path for x in self.cache_names: x = getattr(self, x) try: x.pop(current_name, None) except TypeError: x.discard(current_name) if current_name == self.opf_name: self.opf_name = new_name if os.path.dirname(old_path) != os.path.dirname(new_path): from calibre.ebooks.oeb.polish.replace import LinkRebaser repl = LinkRebaser(self, current_name, new_name) self.replace_links(new_name, repl) self.dirty(new_name)
[docs] def abspath_to_name(self, fullpath, root=None): ''' Convert an absolute path to a canonical name relative to :attr:`root` :param root: The base folder. By default the root for this container object is used. ''' # OS X silently changes all file names to NFD form. The EPUB # spec requires all text including filenames to be in NFC form. # The proper fix is to implement a VFS that maps between # canonical names and their file system representation, however, # I dont have the time for that now. Note that the container # ensures that all text files are normalized to NFC when # decoding them anyway, so there should be no mismatch between # names in the text and NFC canonical file names. return unicodedata.normalize('NFC', abspath_to_name(fullpath, root or self.root))
[docs] def name_to_abspath(self, name): ' Convert a canonical name to an absolute OS dependent path ' return name_to_abspath(name, self.root)
[docs] def exists(self, name): ''' True iff a file/folder corresponding to the canonical name exists. Note that this function suffers from the limitations of the underlying OS filesystem, in particular case (in)sensitivity. So on a case insensitive filesystem this will return True even if the case of name is different from the case of the underlying filesystem file. See also :meth:`has_name`''' return os.path.exists(self.name_to_abspath(name))
[docs] def href_to_name(self, href, base=None): ''' Convert an href (relative to base) to a name. base must be a name or None, in which case self.root is used. ''' key = href, base ans = self.href_to_name_cache.get(key, null) if ans is null: ans = self.href_to_name_cache[key] = href_to_name(href, self.root, base=base) return ans
[docs] def name_to_href(self, name, base=None): '''Convert a name to a href relative to base, which must be a name or None in which case self.root is used as the base''' return name_to_href(name, self.root, base=base)
[docs] def opf_xpath(self, expr): ' Convenience method to evaluate an XPath expression on the OPF file, has the opf: and dc: namespace prefixes pre-defined. ' return self.opf.xpath(expr, namespaces=OPF_NAMESPACES)
[docs] def has_name(self, name): ''' Return True iff a file with the same canonical name as that specified exists. Unlike :meth:`exists` this method is always case-sensitive. ''' return name and name in self.name_path_map
def has_name_and_is_not_empty(self, name): if not self.has_name(name): return False try: return os.path.getsize(self.name_path_map[name]) > 0 except OSError: return False def has_name_case_insensitive(self, name): if not name: return False name = name.lower() for q in self.name_path_map: if q.lower() == name: return True return False
[docs] def relpath(self, path, base=None): '''Convert an absolute path (with os separators) to a path relative to base (defaults to self.root). The relative path is *not* a name. Use :meth:`abspath_to_name` for that.''' return relpath(path, base or self.root)
def ok_to_be_unmanifested(self, name): return name in self.names_that_need_not_be_manifested @property def names_that_need_not_be_manifested(self): ' Set of names that are allowed to be missing from the manifest. Depends on the e-book file format. ' return {self.opf_name} @property def names_that_must_not_be_removed(self): ' Set of names that must never be deleted from the container. Depends on the e-book file format. ' return {self.opf_name} @property def names_that_must_not_be_changed(self): ' Set of names that must never be renamed. Depends on the e-book file format. ' return set() def parse(self, path, mime): with open(path, 'rb') as src: data = src.read() if mime in OEB_DOCS: data = self.parse_xhtml(data, self.relpath(path)) elif mime[-4:] in {'+xml', '/xml'}: data = self.parse_xml(data) elif mime in OEB_STYLES: data = self.parse_css(data, self.relpath(path)) return data
[docs] def raw_data(self, name, decode=True, normalize_to_nfc=True): ''' Return the raw data corresponding to the file specified by name :param decode: If True and the file has a text based MIME type, decode it and return a unicode object instead of raw bytes. :param normalize_to_nfc: If True the returned unicode object is normalized to the NFC normal form as is required for the EPUB and AZW3 file formats. ''' with self.open(name) as nf: ans = nf.read() mime = self.mime_map.get(name, guess_type(name)) if decode and (mime in OEB_STYLES or mime in OEB_DOCS or mime == 'text/plain' or mime[-4:] in {'+xml', '/xml'}): ans = self.decode(ans, normalize_to_nfc=normalize_to_nfc) return ans
[docs] def parsed(self, name): ''' Return a parsed representation of the file specified by name. For HTML and XML files an lxml tree is returned. For CSS files a css_parser stylesheet is returned. Note that parsed objects are cached for performance. If you make any changes to the parsed object, you must call :meth:`dirty` so that the container knows to update the cache. See also :meth:`replace`.''' ans = self.parsed_cache.get(name, None) if ans is None: self.used_encoding = None mime = self.mime_map.get(name, guess_type(name)) ans = self.parse(self.name_path_map[name], mime) self.parsed_cache[name] = ans self.encoding_map[name] = self.used_encoding return ans
[docs] def replace(self, name, obj): ''' Replace the parsed object corresponding to name with obj, which must be a similar object, i.e. an lxml tree for HTML/XML or a css_parser stylesheet for a CSS file. ''' self.parsed_cache[name] = obj self.dirty(name)
@property def opf(self): ' The parsed OPF file ' return self.parsed(self.opf_name) @property def mi(self): ''' The metadata of this book as a Metadata object. Note that this object is constructed on the fly every time this property is requested, so use it sparingly. ''' from calibre.ebooks.metadata.opf2 import OPF as O mi = self.serialize_item(self.opf_name) return O(BytesIO(mi), basedir=self.opf_dir, unquote_urls=False, populate_spine=False).to_book_metadata() @property def opf_version(self): ' The version set on the OPF\'s <package> element ' try: return self.opf_xpath('//opf:package/@version')[0] except IndexError: return '' @property def opf_version_parsed(self): ' The version set on the OPF\'s <package> element as a tuple of integers ' return parse_opf_version(self.opf_version) @property def manifest_items(self): return self.opf_xpath('//opf:manifest/opf:item[@href and @id]') @property def manifest_id_map(self): ' Mapping of manifest id to canonical names ' return {item.get('id'):self.href_to_name(item.get('href'), self.opf_name) for item in self.manifest_items} @property def manifest_type_map(self): ' Mapping of manifest media-type to list of canonical names of that media-type ' ans = defaultdict(list) for item in self.opf_xpath('//opf:manifest/opf:item[@href and @media-type]'): ans[item.get('media-type').lower()].append(self.href_to_name( item.get('href'), self.opf_name)) return {mt:tuple(v) for mt, v in iteritems(ans)}
[docs] def manifest_items_with_property(self, property_name): ' All manifest items that have the specified property ' prefixes = read_prefixes(self.opf) for item in items_with_property(self.opf, property_name, prefixes): href = item.get('href') if href: yield self.href_to_name(item.get('href'), self.opf_name)
[docs] def manifest_items_of_type(self, predicate): ''' The names of all manifest items whose media-type matches predicate. `predicate` can be a set, a list, a string or a function taking a single argument, which will be called with the media-type. ''' if isinstance(predicate, str): predicate = predicate.__eq__ elif hasattr(predicate, '__contains__'): predicate = predicate.__contains__ for mt, names in iteritems(self.manifest_type_map): if predicate(mt): yield from names
[docs] def apply_unique_properties(self, name, *properties): ''' Ensure that the specified properties are set on only the manifest item identified by name. You can pass None as the name to remove the property from all items. ''' properties = frozenset(properties) removed_names, added_names = [], [] for p in properties: if p.startswith('calibre:'): ensure_prefix(self.opf, None, 'calibre', CALIBRE_PREFIX) break for item in self.opf_xpath('//opf:manifest/opf:item'): iname = self.href_to_name(item.get('href'), self.opf_name) props = (item.get('properties') or '').split() lprops = {p.lower() for p in props} for prop in properties: if prop.lower() in lprops: if name != iname: removed_names.append(iname) props = [p for p in props if p.lower() != prop] if props: item.set('properties', ' '.join(props)) else: del item.attrib['properties'] else: if name == iname: added_names.append(iname) props.append(prop) item.set('properties', ' '.join(props)) self.dirty(self.opf_name) return removed_names, added_names
[docs] def add_properties(self, name, *properties): ''' Add the specified properties to the manifest item identified by name. ''' properties = frozenset(properties) if not properties: return True for p in properties: if p.startswith('calibre:'): ensure_prefix(self.opf, None, 'calibre', CALIBRE_PREFIX) break for item in self.opf_xpath('//opf:manifest/opf:item'): iname = self.href_to_name(item.get('href'), self.opf_name) if name == iname: props = frozenset((item.get('properties') or '').split()) | properties item.set('properties', ' '.join(props)) return True return False
@property def guide_type_map(self): ' Mapping of guide type to canonical name ' return {item.get('type', ''):self.href_to_name(item.get('href'), self.opf_name) for item in self.opf_xpath('//opf:guide/opf:reference[@href and @type]')} @property def spine_iter(self): ''' An iterator that yields item, name is_linear for every item in the books' spine. item is the lxml element, name is the canonical file name and is_linear is True if the item is linear. See also: :attr:`spine_names` and :attr:`spine_items`. ''' manifest_id_map = self.manifest_id_map non_linear = [] for item in self.opf_xpath('//opf:spine/opf:itemref[@idref]'): idref = item.get('idref') name = manifest_id_map.get(idref, None) path = self.name_path_map.get(name, None) if path: if item.get('linear', 'yes') == 'yes': yield item, name, True else: non_linear.append((item, name)) for item, name in non_linear: yield item, name, False def index_in_spine(self, name): manifest_id_map = self.manifest_id_map for i, item in enumerate(self.opf_xpath('//opf:spine/opf:itemref[@idref]')): idref = item.get('idref') q = manifest_id_map.get(idref, None) if q == name: return i @property def spine_names(self): ''' An iterator yielding name and is_linear for every item in the books' spine. See also: :attr:`spine_iter` and :attr:`spine_items`. ''' for item, name, linear in self.spine_iter: yield name, linear @property def spine_items(self): ''' An iterator yielding the path for every item in the books' spine. See also: :attr:`spine_iter` and :attr:`spine_items`. ''' for name, linear in self.spine_names: yield self.name_path_map[name]
[docs] def remove_from_spine(self, spine_items, remove_if_no_longer_in_spine=True): ''' Remove the specified items (by canonical name) from the spine. If ``remove_if_no_longer_in_spine`` is True, the items are also deleted from the book, not just from the spine. ''' nixed = set() for (name, remove), (item, xname, linear) in zip(spine_items, self.spine_iter): if remove and name == xname: self.remove_from_xml(item) nixed.add(name) if remove_if_no_longer_in_spine: # Remove from the book if no longer in spine nixed -= {name for name, linear in self.spine_names} for name in nixed: self.remove_item(name)
[docs] def set_spine(self, spine_items): ''' Set the spine to be spine_items where spine_items is an iterable of the form (name, linear). Will raise an error if one of the names is not present in the manifest. ''' imap = self.manifest_id_map imap = {name:item_id for item_id, name in iteritems(imap)} items = [item for item, name, linear in self.spine_iter] tail, last_tail = (items[0].tail, items[-1].tail) if items else ('\n ', '\n ') for i in items: self.remove_from_xml(i) spine = self.opf_xpath('//opf:spine')[0] spine.text = tail for name, linear in spine_items: i = spine.makeelement('{%s}itemref' % OPF_NAMESPACES['opf'], nsmap={'opf':OPF_NAMESPACES['opf']}) i.tail = tail i.set('idref', imap[name]) spine.append(i) if not linear: i.set('linear', 'no') if len(spine) > 0: spine[-1].tail = last_tail self.dirty(self.opf_name)
[docs] def remove_item(self, name, remove_from_guide=True): ''' Remove the item identified by name from this container. This removes all references to the item in the OPF manifest, guide and spine as well as from any internal caches. ''' removed = set() for elem in self.opf_xpath('//opf:manifest/opf:item[@href]'): if self.href_to_name(elem.get('href'), self.opf_name) == name: id_ = elem.get('id', None) if id_ is not None: removed.add(id_) self.remove_from_xml(elem) self.dirty(self.opf_name) if removed: for spine in self.opf_xpath('//opf:spine'): tocref = spine.attrib.get('toc', None) if tocref and tocref in removed: spine.attrib.pop('toc', None) self.dirty(self.opf_name) for item in self.opf_xpath('//opf:spine/opf:itemref[@idref]'): idref = item.get('idref') if idref in removed: self.remove_from_xml(item) self.dirty(self.opf_name) for meta in self.opf_xpath('//opf:meta[@name="cover" and @content]'): if meta.get('content') in removed: self.remove_from_xml(meta) self.dirty(self.opf_name) for meta in self.opf_xpath('//opf:meta[@refines]'): q = meta.get('refines') if q.startswith('#') and q[1:] in removed: self.remove_from_xml(meta) self.dirty(self.opf_name) if remove_from_guide: for item in self.opf_xpath('//opf:guide/opf:reference[@href]'): if self.href_to_name(item.get('href'), self.opf_name) == name: self.remove_from_xml(item) self.dirty(self.opf_name) path = self.name_path_map.pop(name, None) if path and os.path.exists(path): os.remove(path) self.mime_map.pop(name, None) self.parsed_cache.pop(name, None) self.dirtied.discard(name)
def set_media_overlay_durations(self, duration_map): self.dirty(self.opf_name) for meta in self.opf_xpath('//opf:meta[@property="media:duration"]'): self.remove_from_xml(meta) metadata = self.opf_xpath('//opf:metadata')[0] total_duration = 0 for item_id, duration in duration_map.items(): meta = metadata.makeelement(OPF('meta'), property="media:duration", refines="#" + item_id) meta.text = seconds_to_timestamp(duration) self.insert_into_xml(metadata, meta) total_duration += duration meta = metadata.makeelement(OPF('meta'), property="media:duration") meta.text = seconds_to_timestamp(total_duration) self.insert_into_xml(metadata, meta)
[docs] def dirty(self, name): ''' Mark the parsed object corresponding to name as dirty. See also: :meth:`parsed`. ''' self.dirtied.add(name)
[docs] def remove_from_xml(self, item): 'Removes item from parent, fixing indentation (works only with self closing items)' parent = item.getparent() idx = parent.index(item) if idx == 0: # We are removing the first item - only care about adjusting # the tail if this was the only child if len(parent) == 1: parent.text = item.tail else: # Make sure the preceding item has this tail parent[idx-1].tail = item.tail parent.remove(item) return item
[docs] def insert_into_xml(self, parent, item, index=None): '''Insert item into parent (or append if index is None), fixing indentation. Only works with self closing items.''' if index is None: parent.append(item) else: parent.insert(index, item) idx = parent.index(item) if idx == 0: item.tail = parent.text # If this is the only child of this parent element, we need a # little extra work as we have gone from a self-closing <foo /> # element to <foo><item /></foo> if len(parent) == 1: sibling = parent.getprevious() if sibling is None: # Give up! return parent.text = sibling.text item.tail = sibling.tail else: item.tail = parent[idx-1].tail if idx == len(parent)-1: parent[idx-1].tail = parent.text
[docs] def opf_get_or_create(self, name): ''' Convenience method to either return the first XML element with the specified name or create it under the opf:package element and then return it, if it does not already exist. ''' ans = self.opf_xpath('//opf:'+name) if ans: return ans[0] self.dirty(self.opf_name) package = self.opf_xpath('//opf:package')[0] item = package.makeelement(OPF(name)) item.tail = '\n' package.append(item) return item
[docs] def generate_item(self, name, id_prefix=None, media_type=None, unique_href=True): '''Add an item to the manifest with href derived from the given name. Ensures uniqueness of href and id automatically. Returns generated item.''' id_prefix = id_prefix or 'id' media_type = media_type or self.guess_type(name) if unique_href: name = self.make_name_unique(name) href = self.name_to_href(name, self.opf_name) base, ext = href.rpartition('.')[0::2] all_ids = {x.get('id') for x in self.opf_xpath('//*[@id]')} if id_prefix.endswith('-'): all_ids.add(id_prefix) c = 0 item_id = id_prefix while item_id in all_ids: c += 1 item_id = f'{id_prefix}{c}' manifest = self.opf_xpath('//opf:manifest')[0] item = manifest.makeelement(OPF('item'), id=item_id, href=href) item.set('media-type', media_type) self.insert_into_xml(manifest, item) self.dirty(self.opf_name) name = self.href_to_name(href, self.opf_name) self.name_path_map[name] = path = self.name_to_abspath(name) self.mime_map[name] = media_type # Ensure that the file corresponding to the newly created item exists # otherwise cloned containers will fail when they try to get the number # of links to the file base = os.path.dirname(path) if not os.path.exists(base): os.makedirs(base) open(path, 'wb').close() return item
def format_opf(self): try: mdata = self.opf_xpath('//opf:metadata')[0] except IndexError: pass else: mdata.text = '\n ' remove = set() for child in mdata: child.tail = '\n ' try: if (child.get('name', '').startswith('calibre:' ) and child.get('content', '').strip() in {'{}', ''}): remove.add(child) except AttributeError: continue # Happens for XML comments for child in remove: mdata.remove(child) if len(mdata) > 0: mdata[-1].tail = '\n ' # Ensure name comes before content, needed for Nooks for meta in self.opf_xpath('//opf:meta[@name="cover"]'): if 'content' in meta.attrib: meta.set('content', meta.attrib.pop('content'))
[docs] def serialize_item(self, name): ''' Convert a parsed object (identified by canonical name) into a bytestring. See :meth:`parsed`. ''' data = root = self.parsed(name) if name == self.opf_name: self.format_opf() data = serialize(data, self.mime_map[name], pretty_print=name in self.pretty_print) if name == self.opf_name and root.nsmap.get(None) == OPF2_NS: # Needed as I can't get lxml to output opf:role and # not output <opf:metadata> as well data = re.sub(br'(<[/]{0,1})opf:', br'\1', data) return data
[docs] def commit_item(self, name, keep_parsed=False): ''' Commit a parsed object to disk (it is serialized and written to the underlying file). If ``keep_parsed`` is True the parsed representation is retained in the cache. See also: :meth:`parsed` ''' if name not in self.parsed_cache: return data = self.serialize_item(name) self.dirtied.discard(name) if not keep_parsed: self.parsed_cache.pop(name) dest = self.name_path_map[name] if self.cloned and nlinks_file(dest) > 1: # Decouple this file from its links os.unlink(dest) with open(dest, 'wb') as f: f.write(data)
[docs] def filesize(self, name): ''' Return the size in bytes of the file represented by the specified canonical name. Automatically handles dirtied parsed objects. See also: :meth:`parsed` ''' if name in self.dirtied: self.commit_item(name, keep_parsed=True) path = self.name_to_abspath(name) return os.path.getsize(path)
[docs] def get_file_path_for_processing(self, name, allow_modification=True): ''' Similar to open() except that it returns a file path, instead of an open file object. ''' if name in self.dirtied: self.commit_item(name) self.parsed_cache.pop(name, False) path = self.name_to_abspath(name) base = os.path.dirname(path) if not os.path.exists(base): os.makedirs(base) else: if self.cloned and allow_modification and os.path.exists(path) and nlinks_file(path) > 1: # Decouple this file from its links temp = path + 'xxx' shutil.copyfile(path, temp) if iswindows: retry_on_fail(os.unlink, path) else: os.unlink(path) os.rename(temp, path) return path
[docs] def open(self, name, mode='rb'): ''' Open the file pointed to by name for direct read/write. Note that this will commit the file if it is dirtied and remove it from the parse cache. You must finish with this file before accessing the parsed version of it again, or bad things will happen. ''' return open(self.get_file_path_for_processing(name, mode not in {'r', 'rb'}), mode)
[docs] def commit(self, outpath=None, keep_parsed=False): ''' Commit all dirtied parsed objects to the filesystem and write out the e-book file at outpath. :param output: The path to write the saved e-book file to. If None, the path of the original book file is used. :param keep_parsed: If True the parsed representations of committed items are kept in the cache. ''' for name in tuple(self.dirtied): self.commit_item(name, keep_parsed=keep_parsed)
def compare_to(self, other): if set(self.name_path_map) != set(other.name_path_map): return 'Set of files is not the same' mismatches = [] for name, path in iteritems(self.name_path_map): opath = other.name_path_map[name] with open(path, 'rb') as f1, open(opath, 'rb') as f2: if f1.read() != f2.read(): mismatches.append('The file %s is not the same'%name) return '\n'.join(mismatches)
# }}} # EPUB {{{ class InvalidEpub(InvalidBook): pass class ObfuscationKeyMissing(InvalidEpub): pass OCF_NS = 'urn:oasis:names:tc:opendocument:xmlns:container' VCS_IGNORE_FILES = frozenset('.gitignore .hgignore .agignore .bzrignore'.split()) VCS_DIRS = frozenset(('.git', '.hg', '.svn', '.bzr')) def walk_dir(basedir): for dirpath, dirnames, filenames in os.walk(basedir): for vcsdir in VCS_DIRS: try: dirnames.remove(vcsdir) except Exception: pass is_root = os.path.abspath(os.path.normcase(dirpath)) == os.path.abspath(os.path.normcase(basedir)) yield is_root, dirpath, None for fname in filenames: if fname not in VCS_IGNORE_FILES: yield is_root, dirpath, fname class EpubContainer(Container): book_type = 'epub' @property def book_type_for_display(self): ans = self.book_type.upper() try: v = self.opf_version_parsed except Exception: pass else: try: if v.major == 2: ans += ' 2' else: if not v.minor: ans += f' {v.major}' else: ans += f' {v.major}.{v.minor}' except Exception: pass return ans META_INF = { 'container.xml': True, 'manifest.xml': False, 'encryption.xml': False, 'metadata.xml': False, 'signatures.xml': False, 'rights.xml': False, } def __init__(self, pathtoepub, log, clone_data=None, tdir=None): if clone_data is not None: super().__init__(None, None, log, clone_data=clone_data) for x in ('pathtoepub', 'obfuscated_fonts', 'is_dir'): setattr(self, x, clone_data[x]) return self.pathtoepub = pathtoepub if tdir is None: tdir = PersistentTemporaryDirectory('_epub_container') tdir = os.path.abspath(os.path.realpath(tdir)) self.root = tdir self.is_dir = os.path.isdir(pathtoepub) if self.is_dir: for is_root, dirpath, fname in walk_dir(self.pathtoepub): if is_root: base = tdir else: base = os.path.join(tdir, os.path.relpath(dirpath, self.pathtoepub)) if fname is None: os.mkdir(base) if fname is not None: shutil.copy(os.path.join(dirpath, fname), os.path.join(base, fname)) else: with open(self.pathtoepub, 'rb') as stream: try: zf = ZipFile(stream) zf.extractall(tdir) except: log.exception('EPUB appears to be invalid ZIP file, trying a' ' more forgiving ZIP parser') from calibre.utils.localunzip import extractall stream.seek(0) extractall(stream, path=tdir) try: os.remove(join(tdir, 'mimetype')) except OSError: pass # Ensure all filenames are in NFC normalized form # has no effect on HFS+ filesystems as they always store filenames # in NFD form for filename in walk(self.root): n = unicodedata.normalize('NFC', filename) if n != filename: s = filename + 'suff1x' os.rename(filename, s) os.rename(s, n) container_path = join(self.root, 'META-INF', 'container.xml') if not exists(container_path): raise InvalidEpub('No META-INF/container.xml in epub') with open(container_path, 'rb') as cf: container = safe_xml_fromstring(cf.read()) opf_files = container.xpath(( r'child::ocf:rootfiles/ocf:rootfile' '[@media-type="%s" and @full-path]'%guess_type('a.opf') ), namespaces={'ocf':OCF_NS} ) if not opf_files: raise InvalidEpub('META-INF/container.xml contains no link to OPF file') opf_path = os.path.join(self.root, *(urlunquote(opf_files[0].get('full-path')).split('/'))) if not exists(opf_path): raise InvalidEpub('OPF file does not exist at location pointed to' ' by META-INF/container.xml') super().__init__(tdir, opf_path, log) self.obfuscated_fonts = {} if 'META-INF/encryption.xml' in self.name_path_map: self.process_encryption() self.parsed_cache['META-INF/container.xml'] = container def clone_data(self, dest_dir): ans = super().clone_data(dest_dir) ans['pathtoepub'] = self.pathtoepub ans['obfuscated_fonts'] = self.obfuscated_fonts.copy() ans['is_dir'] = self.is_dir return ans def rename(self, old_name, new_name): is_opf = old_name == self.opf_name super().rename(old_name, new_name) if is_opf: for elem in self.parsed('META-INF/container.xml').xpath(( r'child::ocf:rootfiles/ocf:rootfile' '[@media-type="%s" and @full-path]'%guess_type('a.opf') ), namespaces={'ocf':OCF_NS} ): # The asinine epubcheck cannot handle quoted filenames in # container.xml elem.set('full-path', self.opf_name) self.dirty('META-INF/container.xml') if old_name in self.obfuscated_fonts: self.obfuscated_fonts[new_name] = self.obfuscated_fonts.pop(old_name) enc = self.parsed('META-INF/encryption.xml') for cr in enc.xpath('//*[local-name()="CipherReference" and @URI]'): if self.href_to_name(cr.get('URI')) == old_name: cr.set('URI', self.name_to_href(new_name)) self.dirty('META-INF/encryption.xml') @property def names_that_need_not_be_manifested(self): return super().names_that_need_not_be_manifested | {'META-INF/' + x for x in self.META_INF} def ok_to_be_unmanifested(self, name): return name in self.names_that_need_not_be_manifested or name.startswith('META-INF/') @property def names_that_must_not_be_removed(self): return super().names_that_must_not_be_removed | {'META-INF/container.xml'} @property def names_that_must_not_be_changed(self): return super().names_that_must_not_be_changed | {'META-INF/' + x for x in self.META_INF} def remove_item(self, name, remove_from_guide=True): # Handle removal of obfuscated fonts if name == 'META-INF/encryption.xml': self.obfuscated_fonts.clear() if name in self.obfuscated_fonts: self.obfuscated_fonts.pop(name, None) enc = self.parsed('META-INF/encryption.xml') for em in enc.xpath('//*[local-name()="EncryptionMethod" and @Algorithm]'): alg = em.get('Algorithm') if alg not in {ADOBE_OBFUSCATION, IDPF_OBFUSCATION}: continue try: cr = em.getparent().xpath('descendant::*[local-name()="CipherReference" and @URI]')[0] except (IndexError, ValueError, KeyError): continue if name == self.href_to_name(cr.get('URI')): self.remove_from_xml(em.getparent()) self.dirty('META-INF/encryption.xml') super().remove_item(name, remove_from_guide=remove_from_guide) def read_raw_unique_identifier(self): package_id = raw_unique_identifier = idpf_key = None for attrib, val in iteritems(self.opf.attrib): if attrib.endswith('unique-identifier'): package_id = val break if package_id is not None: for elem in self.opf_xpath('//*[@id=%s]'%escape_xpath_attr(package_id)): if elem.text: raw_unique_identifier = elem.text break if raw_unique_identifier is not None: idpf_key = raw_unique_identifier idpf_key = re.sub('[\u0020\u0009\u000d\u000a]', '', idpf_key) idpf_key = hashlib.sha1(idpf_key.encode('utf-8')).digest() return package_id, raw_unique_identifier, idpf_key def iter_encryption_entries(self): if 'META-INF/encryption.xml' in self.name_path_map: enc = self.parsed('META-INF/encryption.xml') for em in enc.xpath('//*[local-name()="EncryptionMethod" and @Algorithm]'): try: cr = em.getparent().xpath('descendant::*[local-name()="CipherReference" and @URI]')[0] except Exception: cr = None yield em, cr def process_encryption(self): fonts = {} for em, cr in self.iter_encryption_entries(): alg = em.get('Algorithm') if alg not in {ADOBE_OBFUSCATION, IDPF_OBFUSCATION}: raise DRMError() if cr is None: continue name = self.href_to_name(cr.get('URI')) path = self.name_path_map.get(name, None) if path is not None: fonts[name] = alg package_id, raw_unique_identifier, idpf_key = self.read_raw_unique_identifier() key = None for item in self.opf_xpath('//*[local-name()="metadata"]/*' '[local-name()="identifier"]'): scheme = None for xkey in item.attrib.keys(): if xkey.endswith('scheme'): scheme = item.get(xkey) if (scheme and scheme.lower() == 'uuid') or \ (item.text and item.text.startswith('urn:uuid:')): try: key = item.text.rpartition(':')[-1] key = uuid.UUID(key).bytes except Exception: self.log.exception('Failed to parse obfuscation key') key = None for font, alg in iteritems(fonts): tkey = key if alg == ADOBE_OBFUSCATION else idpf_key if not tkey: raise ObfuscationKeyMissing('Failed to find obfuscation key') raw = self.raw_data(font, decode=False) raw = decrypt_font_data(tkey, raw, alg) with self.open(font, 'wb') as f: f.write(raw) self.obfuscated_fonts[font] = (alg, tkey) def update_modified_timestamp(self): from calibre.ebooks.metadata.opf3 import set_last_modified_in_opf set_last_modified_in_opf(self.opf) self.dirty(self.opf_name) def commit(self, outpath=None, keep_parsed=False): if self.opf_version_parsed.major == 3: self.update_modified_timestamp() super().commit(keep_parsed=keep_parsed) container_path = join(self.root, 'META-INF', 'container.xml') if not exists(container_path): raise InvalidEpub('No META-INF/container.xml in EPUB, this typically happens if the temporary files calibre' ' is using are deleted by some other program while calibre is running') restore_fonts = {} for name in self.obfuscated_fonts: if name not in self.name_path_map: continue alg, key = self.obfuscated_fonts[name] # Decrypting and encrypting are the same operation (XOR with key) restore_fonts[name] = data = self.raw_data(name, decode=False) with self.open(name, 'wb') as f: f.write(decrypt_font_data(key, data, alg)) if outpath is None: outpath = self.pathtoepub if self.is_dir: # First remove items from the source dir that do not exist any more for is_root, dirpath, fname in walk_dir(self.pathtoepub): if fname is not None: if is_root and fname == 'mimetype': continue base = self.root if is_root else os.path.join(self.root, os.path.relpath(dirpath, self.pathtoepub)) fpath = os.path.join(base, fname) if not os.path.exists(fpath): os.remove(os.path.join(dirpath, fname)) try: os.rmdir(dirpath) except OSError as err: if err.errno != errno.ENOTEMPTY: raise # Now copy over everything from root to source dir for dirpath, dirnames, filenames in os.walk(self.root): is_root = os.path.abspath(os.path.normcase(dirpath)) == os.path.abspath(os.path.normcase(self.root)) base = self.pathtoepub if is_root else os.path.join(self.pathtoepub, os.path.relpath(dirpath, self.root)) try: os.mkdir(base) except OSError as err: if err.errno != errno.EEXIST: raise for fname in filenames: with open(os.path.join(dirpath, fname), 'rb') as src, open(os.path.join(base, fname), 'wb') as dest: shutil.copyfileobj(src, dest) else: from calibre.ebooks.tweak import zip_rebuilder with open(join(self.root, 'mimetype'), 'wb') as f: et = guess_type('a.epub') if not isinstance(et, bytes): et = et.encode('ascii') f.write(et) zip_rebuilder(self.root, outpath) for name, data in iteritems(restore_fonts): with self.open(name, 'wb') as f: f.write(data) @property def path_to_ebook(self): return self.pathtoepub @path_to_ebook.setter def path_to_ebook(self, val): self.pathtoepub = val # }}} # AZW3 {{{ class InvalidMobi(InvalidBook): pass def do_explode(path, dest): from calibre.ebooks.mobi.reader.mobi6 import MobiReader from calibre.ebooks.mobi.reader.mobi8 import Mobi8Reader with open(path, 'rb') as stream: mr = MobiReader(stream, default_log, None, None) with CurrentDir(dest): mr = Mobi8Reader(mr, default_log, for_tweak=True) opf = os.path.abspath(mr()) obfuscated_fonts = mr.encrypted_fonts return opf, obfuscated_fonts def opf_to_azw3(opf, outpath, container): from calibre.ebooks.conversion.plumber import Plumber, create_oebbook from calibre.ebooks.mobi.tweak import set_cover class Item(Manifest.Item): def _parse_css(self, data): # The default CSS parser used by oeb.base inserts the h namespace # and resolves all @import rules. We dont want that. return container.parse_css(data) def specialize(oeb): oeb.manifest.Item = Item plumber = Plumber(opf, outpath, container.log) plumber.setup_options() inp = plugin_for_input_format('azw3') outp = plugin_for_output_format('azw3') plumber.opts.mobi_passthrough = True plumber.opts.keep_ligatures = True oeb = create_oebbook(container.log, opf, plumber.opts, specialize=specialize) set_cover(oeb) outp.convert(oeb, outpath, inp, plumber.opts, container.log) def epub_to_azw3(epub, outpath=None): container = get_container(epub, tweak_mode=True) changed = False for item in container.opf_xpath('//opf:manifest/opf:item[@properties and @href]'): p = item.get('properties').split() if 'cover-image' in p: href = item.get('href') guides = container.opf_xpath('//opf:guide') if not guides: guides = (container.opf.makeelement(OPF('guide')),) container.opf.append(guides[0]) for guide in guides: for child in guide: if child.get('type') == 'cover': break else: guide.append(guide.makeelement(OPF('reference'), type='cover', href=href)) changed = True break elif 'calibre:title-page' in p: item.getparent().remove(item) if changed: container.dirty(container.opf_name) container.commit_item(container.opf_name) outpath = outpath or (epub.rpartition('.')[0] + '.azw3') opf_to_azw3(container.name_to_abspath(container.opf_name), outpath, container) class AZW3Container(Container): book_type = 'azw3' SUPPORTS_TITLEPAGES = False SUPPORTS_FILENAMES = False def __init__(self, pathtoazw3, log, clone_data=None, tdir=None): if clone_data is not None: super().__init__(None, None, log, clone_data=clone_data) for x in ('pathtoazw3', 'obfuscated_fonts'): setattr(self, x, clone_data[x]) return self.pathtoazw3 = pathtoazw3 if tdir is None: tdir = PersistentTemporaryDirectory('_azw3_container') tdir = os.path.abspath(os.path.realpath(tdir)) self.root = tdir with open(pathtoazw3, 'rb') as stream: raw = stream.read(3) if raw == b'TPZ': raise InvalidMobi(_('This is not a MOBI file. It is a Topaz file.')) try: header = MetadataHeader(stream, default_log) except MobiError: raise InvalidMobi(_('This is not a MOBI file.')) if header.encryption_type != 0: raise DRMError() kf8_type = header.kf8_type if kf8_type is None: raise InvalidMobi(_('This MOBI file does not contain a KF8 format ' 'book. KF8 is the new format from Amazon. calibre can ' 'only edit MOBI files that contain KF8 books. Older ' 'MOBI files without KF8 are not editable.')) if kf8_type == 'joint': raise InvalidMobi(_('This MOBI file contains both KF8 and ' 'older Mobi6 data. calibre can only edit MOBI files ' 'that contain only KF8 data.')) try: opf_path, obfuscated_fonts = fork_job( 'calibre.ebooks.oeb.polish.container', 'do_explode', args=(pathtoazw3, tdir), no_output=True)['result'] except WorkerError as e: log(e.orig_tb) raise InvalidMobi('Failed to explode MOBI') super().__init__(tdir, opf_path, log) self.obfuscated_fonts = {x.replace(os.sep, '/') for x in obfuscated_fonts} def clone_data(self, dest_dir): ans = super().clone_data(dest_dir) ans['pathtoazw3'] = self.pathtoazw3 ans['obfuscated_fonts'] = self.obfuscated_fonts.copy() return ans def commit(self, outpath=None, keep_parsed=False): super().commit(keep_parsed=keep_parsed) if outpath is None: outpath = self.pathtoazw3 opf_to_azw3(self.name_path_map[self.opf_name], outpath, self) @property def path_to_ebook(self): return self.pathtoazw3 @path_to_ebook.setter def path_to_ebook(self, val): self.pathtoazw3 = val @property def names_that_must_not_be_changed(self): return set(self.name_path_map) # }}} def get_container(path, log=None, tdir=None, tweak_mode=False): if log is None: log = default_log try: isdir = os.path.isdir(path) except Exception: isdir = False own_tdir = not tdir ebook_cls = (AZW3Container if path.rpartition('.')[-1].lower() in {'azw3', 'mobi', 'original_azw3', 'original_mobi'} and not isdir else EpubContainer) if own_tdir: tdir = PersistentTemporaryDirectory(f'_{ebook_cls.book_type}_container') try: ebook = ebook_cls(path, log, tdir=tdir) ebook.tweak_mode = tweak_mode except BaseException: if own_tdir: shutil.rmtree(tdir, ignore_errors=True) raise return ebook def test_roundtrip(): ebook = get_container(sys.argv[-1]) p = PersistentTemporaryFile(suffix='.'+sys.argv[-1].rpartition('.')[-1]) p.close() ebook.commit(outpath=p.name) ebook2 = get_container(p.name) ebook3 = get_container(p.name) diff = ebook3.compare_to(ebook2) if diff is not None: print(diff) if __name__ == '__main__': test_roundtrip()