__license__ = 'GPL v3'
__copyright__ = '2008, Kovid Goyal <kovid at kovidgoyal.net>'
'''
Defines various abstract base classes that can be subclassed to create powerful news fetching recipes.
'''
__docformat__ = "restructuredtext en"
import io
import os
import re
import sys
import time
import traceback
from collections import defaultdict
from contextlib import closing
from urllib.parse import urlparse, urlsplit
from calibre import __appname__, as_unicode, browser, force_unicode, iswindows, preferred_encoding, random_user_agent, strftime
from calibre.ebooks.BeautifulSoup import BeautifulSoup, CData, NavigableString, Tag
from calibre.ebooks.metadata import MetaInformation
from calibre.ebooks.metadata.opf2 import OPFCreator
from calibre.ebooks.metadata.toc import TOC
from calibre.ptempfile import PersistentTemporaryFile
from calibre.utils.date import now as nowf
from calibre.utils.icu import numeric_sort_key
from calibre.utils.img import add_borders_to_image, image_to_data, save_cover_data_to
from calibre.utils.localization import _, canonicalize_lang, ngettext
from calibre.utils.logging import ThreadSafeWrapper
from calibre.utils.threadpool import NoResultsPending, ThreadPool, WorkRequest
from calibre.web import Recipe
from calibre.web.feeds import Feed, feed_from_xml, feeds_from_index, templates
from calibre.web.fetch.simple import AbortArticle, RecursiveFetcher
from calibre.web.fetch.simple import option_parser as web2disk_option_parser
from calibre.web.fetch.utils import prepare_masthead_image
from polyglot.builtins import string_or_bytes
def classes(classes):
q = frozenset(classes.split(' '))
return dict(attrs={
'class': lambda x: x and frozenset(x.split()).intersection(q)})
def prefixed_classes(classes):
q = frozenset(classes.split(' '))
def matcher(x):
if x:
for candidate in frozenset(x.split()):
for x in q:
if candidate.startswith(x):
return True
return False
return {'attrs': {'class': matcher}}
class LoginFailed(ValueError):
pass
class DownloadDenied(ValueError):
pass
[docs]
class BasicNewsRecipe(Recipe):
'''
Base class that contains logic needed in all recipes. By overriding
progressively more of the functionality in this class, you can make
progressively more customized/powerful recipes. For a tutorial introduction
to creating recipes, see :doc:`news`.
'''
#: The title to use for the e-book
title = _('Unknown News Source')
#: A couple of lines that describe the content this recipe downloads.
#: This will be used primarily in a GUI that presents a list of recipes.
description = ''
#: The author of this recipe
__author__ = __appname__
#: Minimum calibre version needed to use this recipe
requires_version = (0, 6, 0)
#: The language that the news is in. Must be an ISO-639 code either
#: two or three characters long
language = 'und'
#: Maximum number of articles to download from each feed. This is primarily
#: useful for feeds that don't have article dates. For most feeds, you should
#: use :attr:`BasicNewsRecipe.oldest_article`
max_articles_per_feed = 100
#: Oldest article to download from this news source. In days.
oldest_article = 7.0
#: Number of levels of links to follow on article webpages
recursions = 0
#: The default delay between consecutive downloads in seconds. The argument may be a
#: floating point number to indicate a more precise time. See :meth:`get_url_specific_delay`
#: to implement per URL delays.
delay = 0
#: Publication type
#: Set to newspaper, magazine or blog. If set to None, no publication type
#: metadata will be written to the opf file.
publication_type = 'unknown'
#: Number of simultaneous downloads. Set to 1 if the server is picky.
#: Automatically reduced to 1 if :attr:`BasicNewsRecipe.delay` > 0
simultaneous_downloads = 5
#: Timeout for fetching files from server in seconds
timeout = 120.0
#: The format string for the date shown on the first page.
#: By default: Day_Name, Day_Number Month_Name Year
timefmt = ' [%a, %d %b %Y]'
#: List of feeds to download.
#: Can be either ``[url1, url2, ...]`` or ``[('title1', url1), ('title2', url2),...]``
feeds = None
#: Max number of characters in the short description
summary_length = 500
#: Convenient flag to disable loading of stylesheets for websites
#: that have overly complex stylesheets unsuitable for conversion
#: to e-book formats.
#: If True stylesheets are not downloaded and processed
no_stylesheets = False
#: Convenient flag to strip all JavaScript tags from the downloaded HTML
remove_javascript = True
#: If True the GUI will ask the user for a username and password
#: to use while downloading.
#: If set to "optional" the use of a username and password becomes optional
needs_subscription = False
#: If True the navigation bar is center aligned, otherwise it is left aligned
center_navbar = True
#: Specify an override encoding for sites that have an incorrect
#: charset specification. The most common being specifying ``latin1`` and
#: using ``cp1252``. If None, try to detect the encoding. If it is a
#: callable, the callable is called with two arguments: The recipe object
#: and the source to be decoded. It must return the decoded source.
encoding = None
#: Normally we try to guess if a feed has full articles embedded in it
#: based on the length of the embedded content. If `None`, then the
#: default guessing is used. If `True` then the we always assume the feeds has
#: embedded content and if `False` we always assume the feed does not have
#: embedded content.
use_embedded_content = None
#: Set to True and implement :meth:`get_obfuscated_article` to handle
#: websites that try to make it difficult to scrape content.
articles_are_obfuscated = False
#: Reverse the order of articles in each feed
reverse_article_order = False
#: Automatically extract all the text from downloaded article pages. Uses
#: the algorithms from the readability project. Setting this to True, means
#: that you do not have to worry about cleaning up the downloaded HTML
#: manually (though manual cleanup will always be superior).
auto_cleanup = False
#: Specify elements that the auto cleanup algorithm should never remove.
#: The syntax is a XPath expression. For example::
#:
#: auto_cleanup_keep = '//div[@id="article-image"]' will keep all divs with
#: id="article-image"
#: auto_cleanup_keep = '//*[@class="important"]' will keep all elements
#: with class="important"
#: auto_cleanup_keep = '//div[@id="article-image"]|//span[@class="important"]'
#: will keep all divs with id="article-image" and spans
#: with class="important"
#:
auto_cleanup_keep = None
#: Specify any extra :term:`CSS` that should be added to downloaded :term:`HTML` files.
#: It will be inserted into `<style>` tags, just before the closing
#: `</head>` tag thereby overriding all :term:`CSS` except that which is
#: declared using the style attribute on individual :term:`HTML` tags.
#: Note that if you want to programmatically generate the extra_css override
#: the :meth:`get_extra_css()` method instead.
#: For example::
#:
#: extra_css = '.heading { font: serif x-large }'
#:
extra_css = None
#: If True empty feeds are removed from the output.
#: This option has no effect if parse_index is overridden in
#: the sub class. It is meant only for recipes that return a list
#: of feeds using `feeds` or :meth:`get_feeds`. It is also used if you use
#: the ignore_duplicate_articles option.
remove_empty_feeds = False
#: List of regular expressions that determines which links to follow.
#: If empty, it is ignored. Used only if is_link_wanted is
#: not implemented. For example::
#:
#: match_regexps = [r'page=[0-9]+']
#:
#: will match all URLs that have `page=some number` in them.
#:
#: Only one of :attr:`BasicNewsRecipe.match_regexps` or
#: :attr:`BasicNewsRecipe.filter_regexps` should be defined.
match_regexps = []
#: List of regular expressions that determines which links to ignore.
#: If empty it is ignored. Used only if is_link_wanted is not
#: implemented. For example::
#:
#: filter_regexps = [r'ads\.doubleclick\.net']
#:
#: will remove all URLs that have `ads.doubleclick.net` in them.
#:
#: Only one of :attr:`BasicNewsRecipe.match_regexps` or
#: :attr:`BasicNewsRecipe.filter_regexps` should be defined.
filter_regexps = []
#: Recipe specific options to control the conversion of the downloaded
#: content into an e-book. These will override any user or plugin specified
#: values, so only use if absolutely necessary. For example::
#:
#: conversion_options = {
#: 'base_font_size' : 16,
#: 'linearize_tables' : True,
#: }
#:
conversion_options = {}
#: List of tags to be removed. Specified tags are removed from downloaded HTML.
#: A tag is specified as a dictionary of the form::
#:
#: {
#: name : 'tag name', #e.g. 'div'
#: attrs : a dictionary, #e.g. {'class': 'advertisment'}
#: }
#:
#: All keys are optional. For a full explanation of the search criteria, see
#: `Beautiful Soup <https://www.crummy.com/software/BeautifulSoup/bs4/doc/#searching-the-tree>`__
#: A common example::
#:
#: remove_tags = [dict(name='div', class_='advert')]
#:
#: This will remove all `<div class="advert">` tags and all
#: their children from the downloaded :term:`HTML`.
remove_tags = []
#: Remove all tags that occur after the specified tag.
#: For the format for specifying a tag see :attr:`BasicNewsRecipe.remove_tags`.
#: For example::
#:
#: remove_tags_after = [dict(id='content')]
#:
#: will remove all
#: tags after the first element with `id="content"`.
remove_tags_after = None
#: Remove all tags that occur before the specified tag.
#: For the format for specifying a tag see :attr:`BasicNewsRecipe.remove_tags`.
#: For example::
#:
#: remove_tags_before = dict(id='content')
#:
#: will remove all
#: tags before the first element with `id="content"`.
remove_tags_before = None
#: List of attributes to remove from all tags.
#: For example::
#:
#: remove_attributes = ['style', 'font']
remove_attributes = []
#: Keep only the specified tags and their children.
#: For the format for specifying a tag see :attr:`BasicNewsRecipe.remove_tags`.
#: If this list is not empty, then the `<body>` tag will be emptied and re-filled with
#: the tags that match the entries in this list. For example::
#:
#: keep_only_tags = [dict(id=['content', 'heading'])]
#:
#: will keep only tags that have an `id` attribute of `"content"` or `"heading"`.
keep_only_tags = []
#: List of :term:`regexp` substitution rules to run on the downloaded :term:`HTML`.
#: Each element of the
#: list should be a two element tuple. The first element of the tuple should
#: be a compiled regular expression and the second a callable that takes
#: a single match object and returns a string to replace the match. For example::
#:
#: preprocess_regexps = [
#: (re.compile(r'<!--Article ends here-->.*</body>', re.DOTALL|re.IGNORECASE),
#: lambda match: '</body>'),
#: ]
#:
#: will remove everything from `<!--Article ends here-->` to `</body>`.
preprocess_regexps = []
#: The CSS that is used to style the templates, i.e., the navigation bars and
#: the Tables of Contents. Rather than overriding this variable, you should
#: use `extra_css` in your recipe to customize look and feel.
template_css = '''
.article_date {
color: gray; font-family: monospace;
}
.article_description {
text-indent: 0pt;
}
a.article {
font-weight: bold; text-align:left;
}
a.feed {
font-weight: bold;
}
.calibre_navbar {
font-family:monospace;
}
'''
#: By default, calibre will use a default image for the masthead (Kindle only).
#: Override this in your recipe to provide a URL to use as a masthead.
masthead_url = None
#: By default, the cover image returned by get_cover_url() will be used as
#: the cover for the periodical. Overriding this in your recipe instructs
#: calibre to render the downloaded cover into a frame whose width and height
#: are expressed as a percentage of the downloaded cover.
#: cover_margins = (10, 15, '#ffffff') pads the cover with a white margin
#: 10px on the left and right, 15px on the top and bottom.
#: Color names are defined `here <https://www.imagemagick.org/script/color.php>`_.
#: Note that for some reason, white does not always work in Windows. Use
#: #ffffff instead
cover_margins = (0, 0, '#ffffff')
#: Set to a non empty string to disable this recipe.
#: The string will be used as the disabled message
recipe_disabled = None
#: Ignore duplicates of articles that are present in more than one section.
#: A duplicate article is an article that has the same title and/or URL.
#: To ignore articles with the same title, set this to::
#:
#: ignore_duplicate_articles = {'title'}
#:
#: To use URLs instead, set it to::
#:
#: ignore_duplicate_articles = {'url'}
#:
#: To match on title or URL, set it to::
#:
#: ignore_duplicate_articles = {'title', 'url'}
ignore_duplicate_articles = None
# The following parameters control how the recipe attempts to minimize
# JPEG image sizes
#: Set this to False to ignore all scaling and compression parameters and
#: pass images through unmodified. If True and the other compression
#: parameters are left at their default values, JPEG images will be scaled to fit
#: in the screen dimensions set by the output profile and compressed to size at
#: most (w * h)/16 where w x h are the scaled image dimensions.
compress_news_images = False
#: The factor used when auto compressing JPEG images. If set to None,
#: auto compression is disabled. Otherwise, the images will be reduced in size to
#: (w * h)/compress_news_images_auto_size bytes if possible by reducing
#: the quality level, where w x h are the image dimensions in pixels.
#: The minimum JPEG quality will be 5/100 so it is possible this constraint
#: will not be met. This parameter can be overridden by the parameter
#: compress_news_images_max_size which provides a fixed maximum size for images.
#: Note that if you enable scale_news_images_to_device then the image will
#: first be scaled and then its quality lowered until its size is less than
#: (w * h)/factor where w and h are now the *scaled* image dimensions. In
#: other words, this compression happens after scaling.
compress_news_images_auto_size = 16
#: Set JPEG quality so images do not exceed the size given (in KBytes).
#: If set, this parameter overrides auto compression via compress_news_images_auto_size.
#: The minimum JPEG quality will be 5/100 so it is possible this constraint
#: will not be met.
compress_news_images_max_size = None
#: Rescale images to fit in the device screen dimensions set by the output profile.
#: Ignored if no output profile is set.
scale_news_images_to_device = True
#: Maximum dimensions (w,h) to scale images to. If scale_news_images_to_device is True
#: this is set to the device screen dimensions set by the output profile unless
#: there is no profile set, in which case it is left at whatever value it has been
#: assigned (default None).
scale_news_images = None
#: If set to True then links in downloaded articles that point to other downloaded articles are
#: changed to point to the downloaded copy of the article rather than its original web URL. If you
#: set this to True, you might also need to implement :meth:`canonicalize_internal_url` to work
#: with the URL scheme of your particular website.
resolve_internal_links = False
#: Specify options specific to this recipe. These will be available for the user to customize
#: in the Advanced tab of the Fetch News dialog or at the ebook-convert command line. The options
#: are specified as a dictionary mapping option name to metadata about the option. For example::
#:
#: recipe_specific_options = {
#: 'edition_date': {
#: 'short': 'The issue date to download',
#: 'long': 'Specify a date in the format YYYY-mm-dd to download the issue corresponding to that date',
#: 'default': 'current',
#: }
#: }
#:
#: When the recipe is run, self.recipe_specific_options will be a dict mapping option name to the option value
#: specified by the user. When the option is unspecified by the user, it will have the value specified by 'default'.
#: If no default is specified, the option will not be in the dict at all, when unspecified by the user.
recipe_specific_options = None
#: The simulated browser engine to use when downloading from servers. The default is to use the Python mechanize
#: browser engine, which supports logging in. However, if you don't need logging in, consider changing this
#: to either 'webengine' which uses an actual Chromium browser to do the network requests or 'qt' which
#: uses the Qt Networking backend. Both 'webengine' and 'qt' support HTTP/2, which mechanize does not and
#: are thus harder to fingerprint for bot protection services.
browser_type = 'mechanize'
#: Set to False if you do not want to use gzipped transfers with the mechanize browser.
#: Note that some old servers flake out with gzip.
handle_gzip = True
# See the built-in recipes for examples of these settings.
def short_title(self):
return force_unicode(self.title, preferred_encoding)
[docs]
def is_link_wanted(self, url, tag):
'''
Return True if the link should be followed or False otherwise. By
default, raises NotImplementedError which causes the downloader to
ignore it.
:param url: The URL to be followed
:param tag: The tag from which the URL was derived
'''
raise NotImplementedError()
[docs]
def get_cover_url(self):
'''
Return a :term:`URL` to the cover image for this issue or `None`.
By default it returns the value of the member `self.cover_url` which
is normally `None`. If you want your recipe to download a cover for the e-book
override this method in your subclass, or set the member variable `self.cover_url`
before this method is called.
'''
return getattr(self, 'cover_url', None)
[docs]
def get_feeds(self):
'''
Return a list of :term:`RSS` feeds to fetch for this profile. Each element of the list
must be a 2-element tuple of the form (title, url). If title is None or an
empty string, the title from the feed is used. This method is useful if your recipe
needs to do some processing to figure out the list of feeds to download. If
so, override in your subclass.
'''
if not self.feeds:
raise NotImplementedError()
if self.test:
return self.feeds[:self.test[0]]
return self.feeds
[docs]
def get_url_specific_delay(self, url):
'''
Return the delay in seconds before downloading this URL. If you want to programmatically
determine the delay for the specified URL, override this method in your subclass, returning
self.delay by default for URLs you do not want to affect.
:return: A floating point number, the delay in seconds.
'''
return self.delay
[docs]
@classmethod
def print_version(cls, url):
'''
Take a `url` pointing to the webpage with article content and return the
:term:`URL` pointing to the print version of the article. By default does
nothing. For example::
def print_version(self, url):
return url + '?&pagewanted=print'
'''
raise NotImplementedError()
[docs]
@classmethod
def image_url_processor(cls, baseurl, url):
'''
Perform some processing on image urls (perhaps removing size restrictions for
dynamically generated images, etc.) and return the processed URL. Return None
or an empty string to skip fetching the image.
'''
return url
[docs]
def preprocess_image(self, img_data, image_url):
'''
Perform some processing on downloaded image data. This is called on the raw
data before any resizing is done. Must return the processed raw data. Return
None to skip the image.
'''
return img_data
[docs]
def get_browser(self, *args, **kwargs):
'''
Return a browser instance used to fetch documents from the web. By default
it returns a `mechanize <https://mechanize.readthedocs.io/en/latest/>`_
browser instance that supports cookies, ignores robots.txt, handles
refreshes and has a random common user agent.
To customize the browser override this method in your sub-class as::
def get_browser(self, *a, **kw):
br = super().get_browser(*a, **kw)
# Add some headers
br.addheaders += [
('My-Header', 'one'),
('My-Header2', 'two'),
]
# Set some cookies
br.set_cookie('name', 'value')
br.set_cookie('name2', 'value2', domain='.mydomain.com')
# Make a POST request with some data
br.open('https://someurl.com', {'username': 'def', 'password': 'pwd'}).read()
# Do a login via a simple web form (only supported with mechanize browsers)
if self.username is not None and self.password is not None:
br.open('https://www.nytimes.com/auth/login')
br.select_form(name='login')
br['USERID'] = self.username
br['PASSWORD'] = self.password
br.submit()
return br
'''
if 'user_agent' not in kwargs:
# More and more news sites are serving JPEG XR images to IE
ua = getattr(self, 'last_used_user_agent', None) or self.calibre_most_common_ua or random_user_agent(allow_ie=False)
kwargs['user_agent'] = self.last_used_user_agent = ua
self.log('Using user agent:', kwargs['user_agent'])
if self.browser_type != 'mechanize':
from calibre.scraper.qt import Browser, WebEngineBrowser
return {'qt': Browser, 'webengine': WebEngineBrowser}[self.browser_type](
user_agent=kwargs['user_agent'], verify_ssl_certificates=kwargs.get('verify_ssl_certificates', False))
br = browser(*args, **kwargs)
br.addheaders += [('Accept', '*/*')]
if self.handle_gzip:
br.set_handle_gzip(True)
return br
[docs]
def clone_browser(self, br):
'''
Clone the browser br. Cloned browsers are used for multi-threaded
downloads, since mechanize is not thread safe. The default cloning
routines should capture most browser customization, but if you do
something exotic in your recipe, you should override this method in
your recipe and clone manually.
Cloned browser instances use the same, thread-safe CookieJar by
default, unless you have customized cookie handling.
'''
if callable(getattr(br, 'clone_browser', None)):
return br.clone_browser()
# Uh-oh recipe using something exotic, call get_browser
return self.get_browser()
@property
def cloned_browser(self):
if hasattr(self.get_browser, 'is_base_class_implementation') and self.browser_type == 'mechanize':
# We are using the default get_browser, which means no need to
# clone
br = BasicNewsRecipe.get_browser(self)
else:
br = self.clone_browser(self.browser)
return br
[docs]
def get_article_url(self, article):
'''
Override in a subclass to customize extraction of the :term:`URL` that points
to the content for each article. Return the
article URL. It is called with `article`, an object representing a parsed article
from a feed. See `feedparser <https://pythonhosted.org/feedparser/>`_.
By default it looks for the original link (for feeds syndicated via a
service like FeedBurner or Pheedo) and if found,
returns that or else returns
`article.link <https://pythonhosted.org/feedparser/reference-entry-link.html>`_.
'''
for key in article.keys():
if key.endswith('_origlink'):
url = article[key]
if url and (url.startswith('http://') or url.startswith('https://')):
return url
ans = article.get('link', None)
if not ans and getattr(article, 'links', None):
for item in article.links:
if item.get('rel', 'alternate') == 'alternate':
ans = item['href']
break
return ans
[docs]
def skip_ad_pages(self, soup):
'''
This method is called with the source of each downloaded :term:`HTML` file, before
any of the cleanup attributes like remove_tags, keep_only_tags are
applied. Note that preprocess_regexps will have already been applied.
It is meant to allow the recipe to skip ad pages. If the soup represents
an ad page, return the HTML of the real page. Otherwise return
None.
`soup`: A `BeautifulSoup <https://www.crummy.com/software/BeautifulSoup/bs4/doc/>`__
instance containing the downloaded :term:`HTML`.
'''
return None
[docs]
def abort_article(self, msg=None):
''' Call this method inside any of the preprocess methods to abort the
download for the current article. Useful to skip articles that contain
inappropriate content, such as pure video articles. '''
raise AbortArticle(msg or _('Article download aborted'))
[docs]
def preprocess_raw_html(self, raw_html, url):
'''
This method is called with the source of each downloaded :term:`HTML` file, before
it is parsed into an object tree. raw_html is a unicode string
representing the raw HTML downloaded from the web. url is the URL from
which the HTML was downloaded.
Note that this method acts *before* preprocess_regexps.
This method must return the processed raw_html as a unicode object.
'''
return raw_html
def preprocess_raw_html_(self, raw_html, url):
raw_html = self.preprocess_raw_html(raw_html, url)
if self.auto_cleanup:
try:
raw_html = self.extract_readable_article(raw_html, url)
except:
self.log.exception('Auto cleanup of URL: %r failed'%url)
return raw_html
[docs]
def preprocess_html(self, soup):
'''
This method is called with the source of each downloaded :term:`HTML` file, before
it is parsed for links and images. It is called after the cleanup as
specified by remove_tags etc.
It can be used to do arbitrarily powerful pre-processing on the :term:`HTML`.
It should return `soup` after processing it.
`soup`: A `BeautifulSoup <https://www.crummy.com/software/BeautifulSoup/bs4/doc/>`__
instance containing the downloaded :term:`HTML`.
'''
return soup
[docs]
def postprocess_html(self, soup, first_fetch):
'''
This method is called with the source of each downloaded :term:`HTML` file, after
it is parsed for links and images.
It can be used to do arbitrarily powerful post-processing on the :term:`HTML`.
It should return `soup` after processing it.
:param soup: A `BeautifulSoup <https://www.crummy.com/software/BeautifulSoup/bs4/doc/>`__ instance containing the downloaded :term:`HTML`.
:param first_fetch: True if this is the first page of an article.
'''
return soup
[docs]
def cleanup(self):
'''
Called after all articles have been download. Use it to do any cleanup like
logging out of subscription sites, etc.
'''
pass
[docs]
def canonicalize_internal_url(self, url, is_link=True):
'''
Return a set of canonical representations of ``url``. The default
implementation uses just the server hostname and path of the URL,
ignoring any query parameters, fragments, etc. The canonical
representations must be unique across all URLs for this news source. If
they are not, then internal links may be resolved incorrectly.
:param is_link: Is True if the URL is coming from an internal link in
an HTML file. False if the URL is the URL used to
download an article.
'''
try:
parts = urlparse(url)
except Exception:
self.log.error('Failed to parse url: %r, ignoring' % url)
return frozenset()
nl = parts.netloc
path = parts.path or ''
if isinstance(nl, bytes):
nl = nl.decode('utf-8', 'replace')
if isinstance(path, bytes):
path = path.decode('utf-8', 'replace')
return frozenset({(nl, path.rstrip('/'))})
[docs]
def index_to_soup(self, url_or_raw, raw=False, as_tree=False, save_raw=None):
'''
Convenience method that takes an URL to the index page and returns
a `BeautifulSoup <https://www.crummy.com/software/BeautifulSoup/bs4/doc>`__
of it.
`url_or_raw`: Either a URL or the downloaded index page as a string
'''
if re.match((br'\w+://' if isinstance(url_or_raw, bytes) else r'\w+://'), url_or_raw):
# We may be called in a thread (in the skip_ad_pages method), so
# clone the browser to be safe. We cannot use self.cloned_browser
# as it may or may not actually clone the browser, depending on if
# the recipe implements get_browser() or not
br = self.clone_browser(self.browser)
open_func = getattr(br, 'open_novisit', br.open)
with closing(open_func(url_or_raw, timeout=self.timeout)) as f:
_raw = f.read()
if not _raw:
raise RuntimeError('Could not fetch index from %s'%url_or_raw)
else:
_raw = url_or_raw
if raw:
return _raw
if not isinstance(_raw, str) and self.encoding:
if callable(self.encoding):
_raw = self.encoding(_raw)
else:
_raw = _raw.decode(self.encoding, 'replace')
from calibre.ebooks.chardet import strip_encoding_declarations, xml_to_unicode
from calibre.utils.cleantext import clean_xml_chars
if isinstance(_raw, str):
_raw = strip_encoding_declarations(_raw)
else:
_raw = xml_to_unicode(_raw, strip_encoding_pats=True, resolve_entities=True)[0]
_raw = clean_xml_chars(_raw)
if save_raw:
with open(save_raw, 'wb') as f:
f.write(_raw.encode('utf-8'))
if as_tree:
from html5_parser import parse
return parse(_raw)
return BeautifulSoup(_raw)
[docs]
def extract_readable_article(self, html, url):
'''
Extracts main article content from 'html', cleans up and returns as a (article_html, extracted_title) tuple.
Based on the original readability algorithm by Arc90.
'''
from lxml.html import document_fromstring, fragment_fromstring, tostring
from calibre.ebooks.readability import readability
doc = readability.Document(html, self.log, url=url,
keep_elements=self.auto_cleanup_keep)
article_html = doc.summary()
extracted_title = doc.title()
try:
frag = fragment_fromstring(article_html)
except:
doc = document_fromstring(article_html)
frag = doc.xpath('//body')[-1]
if frag.tag == 'html':
root = frag
elif frag.tag == 'body':
root = document_fromstring(
'<html><head><title>%s</title></head></html>' %
extracted_title)
root.append(frag)
else:
root = document_fromstring(
'<html><head><title>%s</title></head><body/></html>' %
extracted_title)
root.xpath('//body')[0].append(frag)
body = root.xpath('//body')[0]
has_title = False
for x in body.iterdescendants():
if x.text == extracted_title:
has_title = True
inline_titles = body.xpath('//h1|//h2')
if not has_title and not inline_titles:
heading = body.makeelement('h2')
heading.text = extracted_title
body.insert(0, heading)
raw_html = tostring(root, encoding='unicode')
return raw_html
[docs]
def sort_index_by(self, index, weights):
'''
Convenience method to sort the titles in `index` according to `weights`.
`index` is sorted in place. Returns `index`.
`index`: A list of titles.
`weights`: A dictionary that maps weights to titles. If any titles
in index are not in weights, they are assumed to have a weight of 0.
'''
weights = defaultdict(int, weights)
index.sort(key=lambda x: weights[x])
return index
[docs]
def parse_index(self):
'''
This method should be implemented in recipes that parse a website
instead of feeds to generate a list of articles. Typical uses are for
news sources that have a "Print Edition" webpage that lists all the
articles in the current print edition. If this function is implemented,
it will be used in preference to :meth:`BasicNewsRecipe.parse_feeds`.
It must return a list. Each element of the list must be a 2-element tuple
of the form ``('feed title', list of articles)``.
Each list of articles must contain dictionaries of the form::
{
'title' : article title,
'url' : URL of print version,
'date' : The publication date of the article as a string,
'description' : A summary of the article
'content' : The full article (can be an empty string). Obsolete
do not use, instead save the content to a temporary
file and pass a file:///path/to/temp/file.html as
the URL.
}
For an example, see the recipe for downloading `The Atlantic`.
In addition, you can add 'author' for the author of the article.
If you want to abort processing for some reason and have
calibre show the user a simple message instead of an error, call
:meth:`abort_recipe_processing`.
'''
raise NotImplementedError()
[docs]
def abort_recipe_processing(self, msg):
'''
Causes the recipe download system to abort the download of this recipe,
displaying a simple feedback message to the user.
'''
from calibre.ebooks.conversion import ConversionUserFeedBack
raise ConversionUserFeedBack(_('Failed to download %s')%self.title,
msg)
[docs]
def get_obfuscated_article(self, url):
'''
If you set `articles_are_obfuscated` this method is called with
every article URL. It should return the path to a file on the filesystem
that contains the article HTML. That file is processed by the recursive
HTML fetching engine, so it can contain links to pages/images on the web.
Alternately, you can return a dictionary of the form:
{'data': <HTML data>, 'url': <the resolved URL of the article>}. This avoids
needing to create temporary files. The `url` key in the dictionary is useful if
the effective URL of the article is different from the URL passed into this method,
for example, because of redirects. It can be omitted if the URL is unchanged.
This method is typically useful for sites that try to make it difficult to
access article content automatically.
'''
raise NotImplementedError()
[docs]
def add_toc_thumbnail(self, article, src):
'''
Call this from populate_article_metadata with the src attribute of an
<img> tag from the article that is appropriate for use as the thumbnail
representing the article in the Table of Contents. Whether the
thumbnail is actually used is device dependent (currently only used by
the Kindles). Note that the referenced image must be one that was
successfully downloaded, otherwise it will be ignored.
'''
if not src or not hasattr(article, 'toc_thumbnail'):
return
src = src.replace('\\', '/')
if re.search(r'feed_\d+/article_\d+/images/img', src, flags=re.I) is None:
self.log.warn('Ignoring invalid TOC thumbnail image: %r'%src)
return
article.toc_thumbnail = re.sub(r'^.*?feed', 'feed',
src, flags=re.IGNORECASE)
[docs]
def populate_article_metadata(self, article, soup, first):
'''
Called when each HTML page belonging to article is downloaded.
Intended to be used to get article metadata like author/summary/etc.
from the parsed HTML (soup).
:param article: A object of class :class:`calibre.web.feeds.Article`.
If you change the summary, remember to also change the text_summary
:param soup: Parsed HTML belonging to this article
:param first: True iff the parsed HTML is the first page of the article.
'''
pass
[docs]
def postprocess_book(self, oeb, opts, log):
'''
Run any needed post processing on the parsed downloaded e-book.
:param oeb: An OEBBook object
:param opts: Conversion options
'''
pass
def __init__(self, options, log, progress_reporter):
'''
Initialize the recipe.
:param options: Parsed commandline options
:param log: Logging object
:param progress_reporter: A Callable that takes two arguments: progress (a number between 0 and 1) and a string message. The message should be optional.
'''
self.log = ThreadSafeWrapper(log)
if not isinstance(self.title, str):
self.title = str(self.title, 'utf-8', 'replace')
self.debug = options.verbose > 1
self.output_dir = os.path.abspath(os.getcwd())
self.verbose = options.verbose
self.test = options.test
if self.test and not isinstance(self.test, tuple):
self.test = (2, 2)
self.username = options.username
self.password = options.password
self.lrf = options.lrf
self.output_profile = options.output_profile
self.touchscreen = getattr(self.output_profile, 'touchscreen', False)
if self.touchscreen:
self.template_css += self.output_profile.touchscreen_news_css
if self.test:
self.max_articles_per_feed = self.test[1]
self.simultaneous_downloads = min(4, self.simultaneous_downloads)
if self.debug:
self.verbose = True
self.report_progress = progress_reporter
if self.needs_subscription and (
self.username is None or self.password is None or (
not self.username and not self.password)):
if self.needs_subscription != 'optional':
raise ValueError(_('The "%s" recipe needs a username and password.')%self.title)
self.browser = self.get_browser()
self.image_map, self.image_counter = {}, 1
self.css_map = {}
web2disk_cmdline = ['web2disk',
'--timeout', str(self.timeout),
'--max-recursions', str(self.recursions),
'--delay', str(self.delay),
]
if self.verbose:
web2disk_cmdline.append('--verbose')
if self.no_stylesheets:
web2disk_cmdline.append('--dont-download-stylesheets')
for reg in self.match_regexps:
web2disk_cmdline.extend(['--match-regexp', reg])
for reg in self.filter_regexps:
web2disk_cmdline.extend(['--filter-regexp', reg])
if options.output_profile.short_name in ('default', 'tablet'):
self.scale_news_images_to_device = False
elif self.scale_news_images_to_device:
self.scale_news_images = options.output_profile.screen_size
self.web2disk_options = web2disk_option_parser().parse_args(web2disk_cmdline)[0]
for extra in ('keep_only_tags', 'remove_tags', 'preprocess_regexps',
'skip_ad_pages', 'preprocess_html', 'remove_tags_after',
'remove_tags_before', 'is_link_wanted',
'compress_news_images', 'compress_news_images_max_size',
'compress_news_images_auto_size', 'scale_news_images'):
setattr(self.web2disk_options, extra, getattr(self, extra))
self.web2disk_options.postprocess_html = self._postprocess_html
self.web2disk_options.preprocess_image = self.preprocess_image
self.web2disk_options.encoding = self.encoding
self.web2disk_options.preprocess_raw_html = self.preprocess_raw_html_
self.web2disk_options.get_delay = self.get_url_specific_delay
if self.delay > 0:
self.simultaneous_downloads = 1
self.navbar = templates.TouchscreenNavBarTemplate() if self.touchscreen else \
templates.NavBarTemplate()
self.failed_downloads = []
self.partial_failures = []
self.aborted_articles = []
self.recipe_specific_options_metadata = rso = self.recipe_specific_options or {}
self.recipe_specific_options = {k: rso[k]['default'] for k in rso if 'default' in rso[k]}
for x in (options.recipe_specific_option or ()):
k, sep, v = x.partition(':')
if not sep:
raise ValueError(f'{x} is not a valid recipe specific option')
if k not in rso:
raise KeyError(f'{k} is not an option supported by: {self.title}')
self.recipe_specific_options[k] = v
if self.recipe_specific_options:
log('Recipe specific options:')
for k, v in self.recipe_specific_options.items():
log(' ', f'{k} = {v}')
def _postprocess_html(self, soup, first_fetch, job_info):
if self.no_stylesheets:
for link in soup.findAll('link'):
if (link.get('type') or 'text/css').lower() == 'text/css' and 'stylesheet' in (link.get('rel') or ('stylesheet',)):
link.extract()
for style in soup.findAll('style'):
style.extract()
head = soup.find('head')
if not head:
head = soup.find('body')
if not head:
head = soup.find(True)
css = self.template_css + '\n\n' + (self.get_extra_css() or '')
style = soup.new_tag('style', type='text/css', title='override_css')
style.append(css)
head.append(style)
if first_fetch and job_info:
url, f, a, feed_len = job_info
body = soup.find('body')
if body is not None:
templ = self.navbar.generate(False, f, a, feed_len,
not self.has_single_feed,
url, __appname__,
center=self.center_navbar,
extra_css=self.get_extra_css() or '')
elem = BeautifulSoup(templ.render(doctype='xhtml').decode('utf-8')).find('div')
body.insert(0, elem)
# This is needed because otherwise inserting elements into
# the soup breaks find()
soup = BeautifulSoup(soup.decode_contents())
if self.remove_javascript:
for script in list(soup.findAll('script')):
script.extract()
for o in soup.findAll(onload=True):
del o['onload']
for attr in self.remove_attributes:
for x in soup.findAll(attrs={attr:True}):
del x[attr]
for bad_tag in list(soup.findAll(['base', 'iframe', 'canvas', 'embed', 'button',
'command', 'datalist', 'video', 'audio', 'noscript', 'link', 'meta'])):
# link tags can be used for preloading causing network activity in
# calibre viewer. meta tags can do all sorts of crazy things,
# including http-equiv refresh, viewport shenanigans, etc.
bad_tag.extract()
# srcset causes some viewers, like calibre's to load images from the
# web, and it also possible causes iBooks on iOS to barf, see
# https://bugs.launchpad.net/bugs/1713986
for img in soup.findAll('img', srcset=True):
del img['srcset']
ans = self.postprocess_html(soup, first_fetch)
# Nuke HTML5 tags
for x in ans.findAll(['article', 'aside', 'header', 'footer', 'nav',
'figcaption', 'figure', 'section']):
x.get_attribute_list('class').append(f'calibre-nuked-tag-{x.name}')
x.name = 'div'
if job_info:
url, f, a, feed_len = job_info
try:
article = self.feed_objects[f].articles[a]
except:
self.log.exception('Failed to get article object for postprocessing')
pass
else:
self.populate_article_metadata(article, ans, first_fetch)
return ans
[docs]
def download(self):
'''
Download and pre-process all articles from the feeds in this recipe.
This method should be called only once on a particular Recipe instance.
Calling it more than once will lead to undefined behavior.
:return: Path to index.html
'''
try:
res = self.build_index()
self.report_progress(1, _('Download finished'))
if self.failed_downloads:
self.log.warning(_('Failed to download the following articles:'))
for feed, article, debug in self.failed_downloads:
self.log.warning(article.title, 'from', feed.title)
self.log.debug(article.url)
self.log.debug(debug)
if self.partial_failures:
self.log.warning(_('Failed to download parts of the following articles:'))
for feed, atitle, aurl, debug in self.partial_failures:
self.log.warning(atitle + _(' from ') + feed)
self.log.debug(aurl)
self.log.warning(_('\tFailed links:'))
for l, tb in debug:
self.log.warning(l)
self.log.debug(tb)
return res
finally:
self.cleanup()
@property
def lang_for_html(self):
try:
lang = self.language.replace('_', '-').partition('-')[0].lower()
if lang == 'und':
lang = None
except:
lang = None
return lang
def feeds2index(self, feeds):
templ = (templates.TouchscreenIndexTemplate if self.touchscreen else
templates.IndexTemplate)
templ = templ(lang=self.lang_for_html)
css = self.template_css + '\n\n' +(self.get_extra_css() or '')
timefmt = self.timefmt
return templ.generate(self.title, "mastheadImage.jpg", timefmt, feeds,
extra_css=css).render(doctype='xhtml')
@classmethod
def description_limiter(cls, src):
if not src:
return ''
src = force_unicode(src, 'utf-8')
pos = cls.summary_length
fuzz = 50
si = src.find(';', pos)
if si > 0 and si-pos > fuzz:
si = -1
gi = src.find('>', pos)
if gi > 0 and gi-pos > fuzz:
gi = -1
npos = max(si, gi)
if npos < 0:
npos = pos
ans = src[:npos+1]
if len(ans) < len(src):
from calibre.utils.cleantext import clean_xml_chars
# Truncating the string could cause a dangling UTF-16 half-surrogate, which will cause lxml to barf, clean it
ans = clean_xml_chars(ans) + '\u2026'
return ans
def feed2index(self, f, feeds):
feed = feeds[f]
if feed.image_url is not None: # Download feed image
imgdir = os.path.join(self.output_dir, 'images')
if not os.path.isdir(imgdir):
os.makedirs(imgdir)
if feed.image_url in self.image_map:
feed.image_url = self.image_map[feed.image_url]
else:
bn = urlsplit(feed.image_url).path
if bn:
bn = bn.rpartition('/')[-1]
if bn:
img = os.path.join(imgdir, 'feed_image_%d%s'%(self.image_counter, os.path.splitext(bn)[-1]))
try:
with open(img, 'wb') as fi, closing(self.browser.open(feed.image_url, timeout=self.timeout)) as r:
fi.write(r.read())
self.image_counter += 1
feed.image_url = img
self.image_map[feed.image_url] = img
except:
pass
if isinstance(feed.image_url, bytes):
feed.image_url = feed.image_url.decode(sys.getfilesystemencoding(), 'strict')
templ = (templates.TouchscreenFeedTemplate if self.touchscreen else
templates.FeedTemplate)
templ = templ(lang=self.lang_for_html)
css = self.template_css + '\n\n' +(self.get_extra_css() or '')
return templ.generate(f, feeds, self.description_limiter,
extra_css=css).render(doctype='xhtml')
def _fetch_article(self, url, dir_, f, a, num_of_feeds, preloaded=None):
br = self.browser
if hasattr(self.get_browser, 'is_base_class_implementation'):
# We are using the default get_browser, which means no need to
# clone
br = BasicNewsRecipe.get_browser(self)
else:
br = self.clone_browser(self.browser)
self.web2disk_options.browser = br
fetcher = RecursiveFetcher(self.web2disk_options, self.log,
self.image_map, self.css_map,
(url, f, a, num_of_feeds))
fetcher.browser = br
fetcher.base_dir = dir_
fetcher.current_dir = dir_
fetcher.show_progress = False
fetcher.image_url_processor = self.image_url_processor
if preloaded is not None:
fetcher.preloaded_urls[url] = preloaded
res, path, failures = fetcher.start_fetch(url), fetcher.downloaded_paths, fetcher.failed_links
if not res or not os.path.exists(res):
msg = _('Could not fetch article.') + ' '
if self.debug:
msg += _('The debug traceback is available earlier in this log')
else:
msg += _('Run with -vv to see the reason')
raise Exception(msg)
return res, path, failures
def fetch_article(self, url, dir, f, a, num_of_feeds):
return self._fetch_article(url, dir, f, a, num_of_feeds)
def fetch_obfuscated_article(self, url, dir, f, a, num_of_feeds):
x = self.get_obfuscated_article(url)
if isinstance(x, dict):
data = x['data']
if isinstance(data, str):
data = data.encode(self.encoding or 'utf-8')
url = x.get('url', url)
else:
with open(x, 'rb') as of:
data = of.read()
os.remove(x)
return self._fetch_article(url, dir, f, a, num_of_feeds, preloaded=data)
def fetch_embedded_article(self, article, dir, f, a, num_of_feeds):
templ = templates.EmbeddedContent()
raw = templ.generate(article).render('html')
with PersistentTemporaryFile('_feeds2disk.html') as pt:
pt.write(raw)
url = ('file:'+pt.name) if iswindows else ('file://'+pt.name)
return self._fetch_article(url, dir, f, a, num_of_feeds)
def remove_duplicate_articles(self, feeds):
seen_keys = defaultdict(set)
remove = []
for f in feeds:
for article in f:
for key in self.ignore_duplicate_articles:
val = getattr(article, key)
seen = seen_keys[key]
if val:
if val in seen:
remove.append((f, article))
else:
seen.add(val)
for feed, article in remove:
self.log.debug('Removing duplicate article: %s from section: %s'%(
article.title, feed.title))
feed.remove_article(article)
if self.remove_empty_feeds:
feeds = [f for f in feeds if len(f) > 0]
return feeds
def build_index(self):
self.report_progress(0, _('Fetching feeds...'))
feeds = None
try:
feeds = feeds_from_index(self.parse_index(), oldest_article=self.oldest_article,
max_articles_per_feed=self.max_articles_per_feed,
log=self.log)
self.report_progress(0, _('Got feeds from index page'))
except NotImplementedError:
pass
if feeds is None:
feeds = self.parse_feeds()
if not feeds:
raise ValueError('No articles found, aborting')
if self.ignore_duplicate_articles is not None:
feeds = self.remove_duplicate_articles(feeds)
self.report_progress(0, _('Trying to download cover...'))
self.download_cover()
self.report_progress(0, _('Generating masthead...'))
self.resolve_masthead()
if self.test:
feeds = feeds[:self.test[0]]
self.has_single_feed = len(feeds) == 1
index = os.path.join(self.output_dir, 'index.html')
html = self.feeds2index(feeds)
with open(index, 'wb') as fi:
fi.write(html)
self.jobs = []
if self.reverse_article_order:
for feed in feeds:
if hasattr(feed, 'reverse'):
feed.reverse()
self.feed_objects = feeds
for f, feed in enumerate(feeds):
feed_dir = os.path.join(self.output_dir, 'feed_%d'%f)
if not os.path.isdir(feed_dir):
os.makedirs(feed_dir)
for a, article in enumerate(feed):
if a >= self.max_articles_per_feed:
break
art_dir = os.path.join(feed_dir, 'article_%d'%a)
if not os.path.isdir(art_dir):
os.makedirs(art_dir)
try:
url = self.print_version(article.url)
except NotImplementedError:
url = article.url
except:
self.log.exception('Failed to find print version for: '+article.url)
url = None
if not url:
continue
func, arg = (self.fetch_embedded_article, article) \
if self.use_embedded_content or (self.use_embedded_content is None and feed.has_embedded_content()) \
else \
((self.fetch_obfuscated_article if self.articles_are_obfuscated
else self.fetch_article), url)
req = WorkRequest(func, (arg, art_dir, f, a, len(feed)),
{}, (f, a), self.article_downloaded,
self.error_in_article_download)
req.feed = feed
req.article = article
req.feed_dir = feed_dir
self.jobs.append(req)
self.jobs_done = 0
tp = ThreadPool(self.simultaneous_downloads)
for req in self.jobs:
tp.putRequest(req, block=True, timeout=0)
self.report_progress(0, ngettext(
'Starting download in a single thread...',
'Starting download [{} threads]...', self.simultaneous_downloads).format(self.simultaneous_downloads))
while True:
try:
tp.poll()
time.sleep(0.1)
except NoResultsPending:
break
for f, feed in enumerate(feeds):
html = self.feed2index(f,feeds)
feed_dir = os.path.join(self.output_dir, 'feed_%d'%f)
with open(os.path.join(feed_dir, 'index.html'), 'wb') as fi:
fi.write(html)
self.create_opf(feeds)
self.report_progress(1, _('Feeds downloaded to %s')%index)
return index
def _download_cover(self):
self.cover_path = None
try:
cu = self.get_cover_url()
except Exception as err:
self.log.error(_('Could not download cover: %s')%as_unicode(err))
self.log.debug(traceback.format_exc())
else:
if not cu:
return
cdata = None
if hasattr(cu, 'read'):
cdata = cu.read()
cu = getattr(cu, 'name', 'cover.jpg')
elif os.access(cu, os.R_OK):
with open(cu, 'rb') as f:
cdata = f.read()
else:
self.report_progress(1, _('Downloading cover from %s')%cu)
with closing(self.browser.open(cu, timeout=self.timeout)) as r:
cdata = r.read()
if not cdata:
return
ext = cu.split('/')[-1].rpartition('.')[-1].lower().strip()
if ext == 'pdf':
from calibre.ebooks.metadata.pdf import get_metadata
stream = io.BytesIO(cdata)
cdata = None
mi = get_metadata(stream)
if mi.cover_data and mi.cover_data[1]:
cdata = mi.cover_data[1]
if not cdata:
return
if self.cover_margins[0] or self.cover_margins[1]:
cdata = image_to_data(add_borders_to_image(cdata,
left=self.cover_margins[0],right=self.cover_margins[0],
top=self.cover_margins[1],bottom=self.cover_margins[1],
border_color=self.cover_margins[2]))
cpath = os.path.join(self.output_dir, 'cover.jpg')
save_cover_data_to(cdata, cpath)
self.cover_path = cpath
def download_cover(self):
self.cover_path = None
try:
self._download_cover()
except:
self.log.exception('Failed to download cover')
self.cover_path = None
def _download_masthead(self, mu):
if hasattr(mu, 'rpartition'):
ext = mu.rpartition('.')[-1]
if '?' in ext:
ext = ''
else:
ext = mu.name.rpartition('.')[-1]
ext = ext.lower() if ext else 'jpg'
mpath = os.path.join(self.output_dir, 'masthead_source.'+ext)
outfile = os.path.join(self.output_dir, 'mastheadImage.jpg')
if hasattr(mu, 'read'):
with open(mpath, 'wb') as mfile:
mfile.write(mu.read())
elif os.access(mu, os.R_OK):
with open(mpath, 'wb') as mfile:
mfile.write(open(mu, 'rb').read())
else:
with open(mpath, 'wb') as mfile, closing(self.browser.open(mu, timeout=self.timeout)) as r:
mfile.write(r.read())
self.report_progress(1, _('Masthead image downloaded'))
self.prepare_masthead_image(mpath, outfile)
self.masthead_path = outfile
if os.path.exists(mpath):
os.remove(mpath)
def download_masthead(self, url):
try:
self._download_masthead(url)
except:
self.log.exception("Failed to download supplied masthead_url")
def resolve_masthead(self):
self.masthead_path = None
try:
murl = self.get_masthead_url()
except:
self.log.exception('Failed to get masthead url')
murl = None
if murl is not None:
# Try downloading the user-supplied masthead_url
# Failure sets self.masthead_path to None
self.download_masthead(murl)
if self.masthead_path is None:
self.log.info("Synthesizing mastheadImage")
self.masthead_path = os.path.join(self.output_dir, 'mastheadImage.jpg')
try:
self.default_masthead_image(self.masthead_path)
except:
self.log.exception('Failed to generate default masthead image')
self.masthead_path = None
[docs]
def default_cover(self, cover_file):
'''
Create a generic cover for recipes that don't have a cover
'''
try:
from calibre.ebooks.covers import create_cover
title = self.title if isinstance(self.title, str) else \
self.title.decode(preferred_encoding, 'replace')
date = strftime(self.timefmt).replace('[', '').replace(']', '')
img_data = create_cover(title, [date])
cover_file.write(img_data)
cover_file.flush()
except:
self.log.exception('Failed to generate default cover')
return False
return True
MI_WIDTH = 600
MI_HEIGHT = 60
def default_masthead_image(self, out_path):
from calibre.ebooks import generate_masthead
generate_masthead(self.get_masthead_title(), output_path=out_path,
width=self.MI_WIDTH, height=self.MI_HEIGHT)
def prepare_masthead_image(self, path_to_image, out_path):
prepare_masthead_image(path_to_image, out_path, self.MI_WIDTH, self.MI_HEIGHT)
[docs]
def publication_date(self):
'''
Use this method to set the date when this issue was published.
Defaults to the moment of download. Must return a :class:`datetime.datetime`
object.
'''
return nowf()
def create_opf(self, feeds, dir=None):
if dir is None:
dir = self.output_dir
title = self.short_title()
pdate = self.publication_date()
if self.output_profile.periodical_date_in_title:
title += strftime(self.timefmt, pdate)
mi = MetaInformation(title, [__appname__])
mi.publisher = __appname__
mi.author_sort = __appname__
if self.publication_type:
mi.publication_type = 'periodical:'+self.publication_type+':'+self.short_title()
mi.timestamp = nowf()
article_titles, aseen = [], set()
for (af, aa) in self.aborted_articles:
aseen.add(aa.title)
for (ff, fa, tb) in self.failed_downloads:
aseen.add(fa.title)
for f in feeds:
for a in f:
if a.title and a.title not in aseen:
aseen.add(a.title)
article_titles.append(force_unicode(a.title, 'utf-8'))
desc = self.description
if not isinstance(desc, str):
desc = desc.decode('utf-8', 'replace')
mi.comments = (_('Articles in this issue:'
) + '\n\n' + '\n\n'.join(article_titles)) + '\n\n' + desc
language = canonicalize_lang(self.language)
if language is not None:
mi.language = language
mi.pubdate = pdate
opf_path = os.path.join(dir, 'index.opf')
ncx_path = os.path.join(dir, 'index.ncx')
opf = OPFCreator(dir, mi)
# Add mastheadImage entry to <guide> section
mp = getattr(self, 'masthead_path', None)
if mp is not None and os.access(mp, os.R_OK):
from calibre.ebooks.metadata.opf2 import Guide
ref = Guide.Reference(os.path.basename(self.masthead_path), os.getcwd())
ref.type = 'masthead'
ref.title = 'Masthead Image'
opf.guide.append(ref)
manifest = [os.path.join(dir, 'feed_%d'%i) for i in range(len(feeds))]
manifest.append(os.path.join(dir, 'index.html'))
manifest.append(os.path.join(dir, 'index.ncx'))
# Get cover
cpath = getattr(self, 'cover_path', None)
if cpath is None:
pf = open(os.path.join(dir, 'cover.jpg'), 'wb')
if self.default_cover(pf):
cpath = pf.name
if cpath is not None and os.access(cpath, os.R_OK):
opf.cover = cpath
manifest.append(cpath)
# Get masthead
mpath = getattr(self, 'masthead_path', None)
if mpath is not None and os.access(mpath, os.R_OK):
manifest.append(mpath)
opf.create_manifest_from_files_in(manifest)
for mani in opf.manifest:
if mani.path.endswith('.ncx'):
mani.id = 'ncx'
if mani.path.endswith('mastheadImage.jpg'):
mani.id = 'masthead-image'
entries = ['index.html']
toc = TOC(base_path=dir)
self.play_order_counter = 0
self.play_order_map = {}
self.article_url_map = aumap = defaultdict(set)
def feed_index(num, parent):
f = feeds[num]
for j, a in enumerate(f):
if getattr(a, 'downloaded', False):
adir = 'feed_%d/article_%d/'%(num, j)
auth = a.author
if not auth:
auth = None
desc = a.text_summary
if not desc:
desc = None
else:
desc = self.description_limiter(desc)
tt = a.toc_thumbnail if a.toc_thumbnail else None
entries.append('%sindex.html'%adir)
po = self.play_order_map.get(entries[-1], None)
if po is None:
self.play_order_counter += 1
po = self.play_order_counter
arelpath = '%sindex.html'%adir
for curl in self.canonicalize_internal_url(a.orig_url, is_link=False):
aumap[curl].add(arelpath)
article_toc_entry = parent.add_item(arelpath, None,
a.title if a.title else _('Untitled article'),
play_order=po, author=auth,
description=desc, toc_thumbnail=tt)
for entry in a.internal_toc_entries:
anchor = entry.get('anchor')
if anchor:
self.play_order_counter += 1
po += 1
article_toc_entry.add_item(
arelpath, entry['anchor'], entry['title'] or _('Unknown section'),
play_order=po
)
last = os.path.join(self.output_dir, ('%sindex.html'%adir).replace('/', os.sep))
for sp in a.sub_pages:
prefix = os.path.commonprefix([opf_path, sp])
relp = sp[len(prefix):]
entries.append(relp.replace(os.sep, '/'))
last = sp
if os.path.exists(last):
with open(last, 'rb') as fi:
src = fi.read().decode('utf-8')
soup = BeautifulSoup(src)
body = soup.find('body')
if body is not None:
prefix = '/'.join('..'for i in range(2*len(re.findall(r'link\d+', last))))
templ = self.navbar.generate(True, num, j, len(f),
not self.has_single_feed,
a.orig_url, __appname__, prefix=prefix,
center=self.center_navbar)
elem = BeautifulSoup(templ.render(doctype='xhtml').decode('utf-8')).find('div')
body.insert(len(body.contents), elem)
with open(last, 'wb') as fi:
fi.write(str(soup).encode('utf-8'))
if len(feeds) == 0:
raise Exception('All feeds are empty, aborting.')
if len(feeds) > 1:
for i, f in enumerate(feeds):
entries.append('feed_%d/index.html'%i)
po = self.play_order_map.get(entries[-1], None)
if po is None:
self.play_order_counter += 1
po = self.play_order_counter
auth = getattr(f, 'author', None)
if not auth:
auth = None
desc = getattr(f, 'description', None)
if not desc:
desc = None
feed_index(i, toc.add_item('feed_%d/index.html'%i, None,
f.title, play_order=po, description=desc, author=auth))
else:
entries.append('feed_%d/index.html'%0)
feed_index(0, toc)
for i, p in enumerate(entries):
entries[i] = os.path.join(dir, p.replace('/', os.sep))
opf.create_spine(entries)
opf.set_toc(toc)
with open(opf_path, 'wb') as opf_file, open(ncx_path, 'wb') as ncx_file:
opf.render(opf_file, ncx_file)
def article_downloaded(self, request, result):
index = os.path.join(os.path.dirname(result[0]), 'index.html')
if index != result[0]:
if os.path.exists(index):
os.remove(index)
os.rename(result[0], index)
a = request.requestID[1]
article = request.article
self.log.debug('Downloaded article:', article.title, 'from', article.url)
article.orig_url = article.url
article.url = 'article_%d/index.html'%a
article.downloaded = True
article.sub_pages = result[1][1:]
self.jobs_done += 1
self.report_progress(float(self.jobs_done)/len(self.jobs),
_('Article downloaded: %s')%force_unicode(article.title))
if result[2]:
self.partial_failures.append((request.feed.title, article.title, article.url, result[2]))
def error_in_article_download(self, request, traceback):
self.jobs_done += 1
if traceback and re.search('^AbortArticle:', traceback, flags=re.M) is not None:
self.log.warn('Aborted download of article:', request.article.title,
'from', request.article.url)
self.report_progress(float(self.jobs_done)/len(self.jobs),
_('Article download aborted: %s')%force_unicode(request.article.title))
self.aborted_articles.append((request.feed, request.article))
else:
self.log.error('Failed to download article:', request.article.title,
'from', request.article.url)
self.log.debug(traceback)
self.log.debug('\n')
self.report_progress(float(self.jobs_done)/len(self.jobs),
_('Article download failed: %s')%force_unicode(request.article.title))
self.failed_downloads.append((request.feed, request.article, traceback))
[docs]
def parse_feeds(self):
'''
Create a list of articles from the list of feeds returned by :meth:`BasicNewsRecipe.get_feeds`.
Return a list of :class:`Feed` objects.
'''
feeds = self.get_feeds()
parsed_feeds = []
br = self.browser
for obj in feeds:
if isinstance(obj, string_or_bytes):
title, url = None, obj
else:
title, url = obj
if isinstance(title, bytes):
title = title.decode('utf-8')
if isinstance(url, bytes):
url = url.decode('utf-8')
if url.startswith('feed://'):
url = 'http'+url[4:]
self.report_progress(0, _('Fetching feed')+' %s...'%(title if title else url))
try:
purl = urlparse(url, allow_fragments=False)
if purl.username or purl.password:
hostname = purl.hostname
if purl.port:
hostname += f':{purl.port}'
url = purl._replace(netloc=hostname).geturl()
if purl.username and purl.password:
br.add_password(url, purl.username, purl.password)
with closing(br.open_novisit(url, timeout=self.timeout)) as f:
raw = f.read()
parsed_feeds.append(feed_from_xml(
raw, title=title, log=self.log,
oldest_article=self.oldest_article,
max_articles_per_feed=self.max_articles_per_feed,
get_article_url=self.get_article_url
))
except Exception as err:
feed = Feed()
msg = 'Failed feed: %s'%(title if title else url)
feed.populate_from_preparsed_feed(msg, [])
feed.description = as_unicode(err)
parsed_feeds.append(feed)
self.log.exception(msg)
delay = self.get_url_specific_delay(url)
if delay > 0:
time.sleep(delay)
remove = [fl for fl in parsed_feeds if len(fl) == 0 and self.remove_empty_feeds]
for f in remove:
parsed_feeds.remove(f)
return parsed_feeds
[docs]
@classmethod
def tag_to_string(self, tag, use_alt=True, normalize_whitespace=True):
'''
Convenience method to take a
`BeautifulSoup <https://www.crummy.com/software/BeautifulSoup/bs4/doc/>`_
:code:`Tag` and extract the text from it recursively, including any CDATA sections
and alt tag attributes. Return a possibly empty Unicode string.
`use_alt`: If `True` try to use the alt attribute for tags that don't
have any textual content
`tag`: `BeautifulSoup <https://www.crummy.com/software/BeautifulSoup/bs4/doc/>`_
:code:`Tag`
'''
if tag is None:
return ''
if isinstance(tag, string_or_bytes):
return tag
if callable(getattr(tag, 'xpath', None)) and not hasattr(tag, 'contents'): # a lxml tag
from lxml.etree import tostring
ans = tostring(tag, method='text', encoding='unicode', with_tail=False)
else:
strings = []
for item in tag.contents:
if isinstance(item, (NavigableString, CData)):
strings.append(item.string)
elif isinstance(item, Tag):
res = self.tag_to_string(item)
if res:
strings.append(res)
elif use_alt:
try:
strings.append(item['alt'])
except KeyError:
pass
ans = ''.join(strings)
if normalize_whitespace:
ans = re.sub(r'\s+', ' ', ans)
return ans
@classmethod
def soup(cls, raw):
return BeautifulSoup(raw)
[docs]
@classmethod
def adeify_images(cls, soup):
'''
If your recipe when converted to EPUB has problems with images when
viewed in Adobe Digital Editions, call this method from within
:meth:`postprocess_html`.
'''
for item in soup.findAll('img'):
for attrib in ['height','width','border','align','style']:
try:
del item[attrib]
except KeyError:
pass
oldParent = item.parent
myIndex = oldParent.contents.index(item)
item.extract()
divtag = soup.new_tag('div')
brtag = soup.new_tag('br')
oldParent.insert(myIndex,divtag)
divtag.append(item)
divtag.append(brtag)
return soup
def internal_postprocess_book(self, oeb, opts, log):
if self.resolve_internal_links and self.article_url_map:
seen = set()
for item in oeb.spine:
for a in item.data.xpath('//*[local-name()="a" and @href]'):
if a.get('rel') == 'calibre-downloaded-from':
continue
url = a.get('href')
for curl in self.canonicalize_internal_url(url):
articles = self.article_url_map.get(curl)
if articles:
arelpath = sorted(articles, key=numeric_sort_key)[0]
a.set('href', item.relhref(arelpath))
if a.text and len(a) == 0:
a.text = a.text + 'ยท' # mark as local link
if url not in seen:
log.debug(f'Resolved internal URL: {url} -> {arelpath}')
seen.add(url)
class CustomIndexRecipe(BasicNewsRecipe):
def custom_index(self):
'''
Return the filesystem path to a custom HTML document that will serve as the index for
this recipe. The index document will typically contain many `<a href="...">`
tags that point to resources on the internet that should be downloaded.
'''
raise NotImplementedError
def create_opf(self):
mi = MetaInformation(self.title + strftime(self.timefmt), [__appname__])
mi.publisher = __appname__
mi.author_sort = __appname__
mi = OPFCreator(self.output_dir, mi)
mi.create_manifest_from_files_in([self.output_dir])
mi.create_spine([os.path.join(self.output_dir, 'index.html')])
with open(os.path.join(self.output_dir, 'index.opf'), 'wb') as opf_file:
mi.render(opf_file)
def download(self):
index = os.path.abspath(self.custom_index())
url = 'file:'+index if iswindows else 'file://'+index
self.web2disk_options.browser = self.clone_browser(self.browser)
fetcher = RecursiveFetcher(self.web2disk_options, self.log)
fetcher.base_dir = self.output_dir
fetcher.current_dir = self.output_dir
fetcher.show_progress = False
res = fetcher.start_fetch(url)
self.create_opf()
return res
class AutomaticNewsRecipe(BasicNewsRecipe):
auto_cleanup = True
class CalibrePeriodical(BasicNewsRecipe):
#: Set this to the slug for the calibre periodical
calibre_periodicals_slug = None
LOG_IN = 'https://news.calibre-ebook.com/accounts/login'
needs_subscription = True
__author__ = 'calibre Periodicals'
def get_browser(self):
br = BasicNewsRecipe.get_browser(self)
br.open(self.LOG_IN)
br.select_form(name='login')
br['username'] = self.username
br['password'] = self.password
raw = br.submit().read()
if 'href="/my-account"' not in raw:
raise LoginFailed(
_('Failed to log in, check your username and password for'
' the calibre Periodicals service.'))
return br
get_browser.is_base_class_implementation = True
def download(self):
self.log('Fetching downloaded recipe')
try:
raw = self.browser.open_novisit(
'https://news.calibre-ebook.com/subscribed_files/%s/0/temp.downloaded_recipe'
% self.calibre_periodicals_slug
).read()
except Exception as e:
if hasattr(e, 'getcode') and e.getcode() == 403:
raise DownloadDenied(
_('You do not have permission to download this issue.'
' Either your subscription has expired or you have'
' exceeded the maximum allowed downloads for today.'))
raise
f = io.BytesIO(raw)
from calibre.utils.zipfile import ZipFile
zf = ZipFile(f)
zf.extractall()
zf.close()
from glob import glob
from calibre.web.feeds.recipes import compile_recipe
try:
recipe = compile_recipe(open(glob('*.recipe')[0],
'rb').read())
self.conversion_options = recipe.conversion_options
except:
self.log.exception('Failed to compile downloaded recipe')
return os.path.abspath('index.html')