__license__ = 'GPL v3'
__copyright__ = '2009, John Schember <john at nachtimwald.com>'
__docformat__ = 'restructuredtext en'
'''
Generic USB Mass storage device driver. This is not a complete stand alone
driver. It is intended to be subclassed with the relevant parts implemented
for a particular device.
'''
import json
import os
import shutil
from itertools import cycle
from calibre import fsync, isbytestring, prints
from calibre.constants import filesystem_encoding, ismacos, numeric_version
from calibre.devices.usbms.books import Book, BookList
from calibre.devices.usbms.cli import CLI
from calibre.devices.usbms.device import Device
from calibre.ebooks.metadata.book.json_codec import JsonCodec
from calibre.prints import debug_print
from polyglot.builtins import itervalues, string_or_bytes
def safe_walk(top, topdown=True, onerror=None, followlinks=False, maxdepth=128):
' A replacement for os.walk that does not die when it encounters undecodeable filenames in a linux filesystem'
if maxdepth < 0:
return
islink, join, isdir = os.path.islink, os.path.join, os.path.isdir
# We may not have read permission for top, in which case we can't
# get a list of the files the directory contains. os.path.walk
# always suppressed the exception then, rather than blow up for a
# minor reason when (say) a thousand readable directories are still
# left to visit. That logic is copied here.
try:
names = os.listdir(top)
except OSError as err:
if onerror is not None:
onerror(err)
return
dirs, nondirs = [], []
for name in names:
if isinstance(name, bytes):
try:
name = name.decode(filesystem_encoding)
except UnicodeDecodeError:
debug_print('Skipping undecodeable file: %r' % name)
continue
if isdir(join(top, name)):
dirs.append(name)
else:
nondirs.append(name)
if topdown:
yield top, dirs, nondirs
for name in dirs:
new_path = join(top, name)
if followlinks or not islink(new_path):
yield from safe_walk(new_path, topdown, onerror, followlinks, maxdepth-1)
if not topdown:
yield top, dirs, nondirs
# CLI must come before Device as it implements the CLI functions that
# are inherited from the device interface in Device.
[Doku]
class USBMS(CLI, Device):
'''
The base class for all USBMS devices. Implements the logic for
sending/getting/updating metadata/caching metadata/etc.
'''
description = _('Communicate with an e-book reader.')
author = 'John Schember'
supported_platforms = ['windows', 'osx', 'linux']
# Store type instances of BookList and Book. We must do this because
# a) we need to override these classes in some device drivers, and
# b) the classmethods seem only to see real attributes declared in the
# class, not attributes stored in the class
booklist_class = BookList
book_class = Book
FORMATS = []
CAN_SET_METADATA = []
METADATA_CACHE = 'metadata.calibre'
DRIVEINFO = 'driveinfo.calibre'
SCAN_FROM_ROOT = False
def _update_driveinfo_record(self, dinfo, prefix, location_code, name=None):
import uuid
from calibre.utils.date import isoformat, now
if not isinstance(dinfo, dict):
dinfo = {}
if dinfo.get('device_store_uuid', None) is None:
dinfo['device_store_uuid'] = str(uuid.uuid4())
if dinfo.get('device_name', None) is None:
dinfo['device_name'] = self.get_gui_name()
if name is not None:
dinfo['device_name'] = name
dinfo['location_code'] = location_code
dinfo['last_library_uuid'] = getattr(self, 'current_library_uuid', None)
dinfo['calibre_version'] = '.'.join([str(i) for i in numeric_version])
dinfo['date_last_connected'] = isoformat(now())
dinfo['prefix'] = prefix.replace('\\', '/')
return dinfo
def _update_driveinfo_file(self, prefix, location_code, name=None):
from calibre.utils.config import from_json, to_json
if os.path.exists(os.path.join(prefix, self.DRIVEINFO)):
with open(os.path.join(prefix, self.DRIVEINFO), 'rb') as f:
try:
driveinfo = json.loads(f.read(), object_hook=from_json)
except:
driveinfo = None
driveinfo = self._update_driveinfo_record(driveinfo, prefix,
location_code, name)
data = json.dumps(driveinfo, default=to_json)
if not isinstance(data, bytes):
data = data.encode('utf-8')
with open(os.path.join(prefix, self.DRIVEINFO), 'wb') as f:
f.write(data)
fsync(f)
else:
driveinfo = self._update_driveinfo_record({}, prefix, location_code, name)
data = json.dumps(driveinfo, default=to_json)
if not isinstance(data, bytes):
data = data.encode('utf-8')
with open(os.path.join(prefix, self.DRIVEINFO), 'wb') as f:
f.write(data)
fsync(f)
return driveinfo
[Doku]
def set_driveinfo_name(self, location_code, name):
if location_code == 'main':
self._update_driveinfo_file(self._main_prefix, location_code, name)
elif location_code == 'A':
self._update_driveinfo_file(self._card_a_prefix, location_code, name)
elif location_code == 'B':
self._update_driveinfo_file(self._card_b_prefix, location_code, name)
def formats_to_scan_for(self):
return set(self.settings().format_map) | set(self.FORMATS)
def is_allowed_book_file(self, filename, path, prefix):
return True
[Doku]
def books(self, oncard=None, end_session=True):
from calibre.ebooks.metadata.meta import path_to_ext
debug_print('USBMS: Fetching list of books from device. Device=',
self.__class__.__name__,
'oncard=', oncard)
dummy_bl = self.booklist_class(None, None, None)
if oncard == 'carda' and not self._card_a_prefix:
self.report_progress(1.0, _('Getting list of books on device...'))
return dummy_bl
elif oncard == 'cardb' and not self._card_b_prefix:
self.report_progress(1.0, _('Getting list of books on device...'))
return dummy_bl
elif oncard and oncard != 'carda' and oncard != 'cardb':
self.report_progress(1.0, _('Getting list of books on device...'))
return dummy_bl
prefix = self._card_a_prefix if oncard == 'carda' else \
self._card_b_prefix if oncard == 'cardb' \
else self._main_prefix
ebook_dirs = self.get_carda_ebook_dir() if oncard == 'carda' else \
self.EBOOK_DIR_CARD_B if oncard == 'cardb' else \
self.get_main_ebook_dir()
debug_print('USBMS: dirs are:', prefix, ebook_dirs)
# get the metadata cache
bl = self.booklist_class(oncard, prefix, self.settings)
need_sync = self.parse_metadata_cache(bl, prefix, self.METADATA_CACHE)
# make a dict cache of paths so the lookup in the loop below is faster.
bl_cache = {}
for idx, b in enumerate(bl):
bl_cache[b.lpath] = idx
all_formats = self.formats_to_scan_for()
def update_booklist(filename, path, prefix):
changed = False
# Ignore AppleDouble files
if filename.startswith("._"):
return False
if path_to_ext(filename) in all_formats and self.is_allowed_book_file(filename, path, prefix):
try:
lpath = os.path.join(path, filename).partition(self.normalize_path(prefix))[2]
if lpath.startswith(os.sep):
lpath = lpath[len(os.sep):]
lpath = lpath.replace('\\', '/')
idx = bl_cache.get(lpath, None)
if idx is not None:
bl_cache[lpath] = None
if self.update_metadata_item(bl[idx]):
# print 'update_metadata_item returned true'
changed = True
else:
if bl.add_book(self.book_from_path(prefix, lpath),
replace_metadata=False):
changed = True
except: # Probably a filename encoding error
import traceback
traceback.print_exc()
return changed
if isinstance(ebook_dirs, string_or_bytes):
ebook_dirs = [ebook_dirs]
for ebook_dir in ebook_dirs:
ebook_dir = self.path_to_unicode(ebook_dir)
if self.SCAN_FROM_ROOT:
ebook_dir = self.normalize_path(prefix)
else:
ebook_dir = self.normalize_path(
os.path.join(prefix, *(ebook_dir.split('/')))
if ebook_dir else prefix)
debug_print('USBMS: scan from root', self.SCAN_FROM_ROOT, ebook_dir)
if not os.path.exists(ebook_dir):
continue
# Get all books in the ebook_dir directory
if self.SUPPORTS_SUB_DIRS or self.SUPPORTS_SUB_DIRS_FOR_SCAN:
# build a list of files to check, so we can accurately report progress
flist = []
for path, dirs, files in safe_walk(ebook_dir):
for filename in files:
if filename != self.METADATA_CACHE:
flist.append({'filename': self.path_to_unicode(filename),
'path':self.path_to_unicode(path)})
for i, f in enumerate(flist):
self.report_progress(i/float(len(flist)), _('Getting list of books on device...'))
changed = update_booklist(f['filename'], f['path'], prefix)
if changed:
need_sync = True
else:
paths = os.listdir(ebook_dir)
for i, filename in enumerate(paths):
self.report_progress((i+1) / float(len(paths)), _('Getting list of books on device...'))
changed = update_booklist(self.path_to_unicode(filename), ebook_dir, prefix)
if changed:
need_sync = True
# Remove books that are no longer in the filesystem. Cache contains
# indices into the booklist if book not in filesystem, None otherwise
# Do the operation in reverse order so indices remain valid
for idx in sorted(itervalues(bl_cache), reverse=True, key=lambda x: -1 if x is None else x):
if idx is not None:
need_sync = True
del bl[idx]
debug_print('USBMS: count found in cache: %d, count of files in metadata: %d, need_sync: %s' %
(len(bl_cache), len(bl), need_sync))
if need_sync: # self.count_found_in_bl != len(bl) or need_sync:
if oncard == 'cardb':
self.sync_booklists((None, None, bl))
elif oncard == 'carda':
self.sync_booklists((None, bl, None))
else:
self.sync_booklists((bl, None, None))
self.report_progress(1.0, _('Getting list of books on device...'))
debug_print('USBMS: Finished fetching list of books from device. oncard=', oncard)
return bl
[Doku]
def upload_books(self, files, names, on_card=None, end_session=True,
metadata=None):
debug_print('USBMS: uploading %d books'%(len(files)))
path = self._sanity_check(on_card, files)
paths = []
names = iter(names)
metadata = iter(metadata)
for i, infile in enumerate(files):
mdata, fname = next(metadata), next(names)
filepath = self.normalize_path(self.create_upload_path(path, mdata, fname))
if not hasattr(infile, 'read'):
infile = self.normalize_path(infile)
filepath = self.put_file(infile, filepath, replace_file=True)
paths.append(filepath)
try:
self.upload_cover(os.path.dirname(filepath),
os.path.splitext(os.path.basename(filepath))[0],
mdata, filepath)
except: # Failure to upload cover is not catastrophic
import traceback
traceback.print_exc()
self.report_progress((i+1) / float(len(files)), _('Transferring books to device...'))
self.report_progress(1.0, _('Transferring books to device...'))
debug_print('USBMS: finished uploading %d books'%(len(files)))
return list(zip(paths, cycle([on_card])))
[Doku]
def upload_cover(self, path, filename, metadata, filepath):
'''
Upload book cover to the device. Default implementation does nothing.
:param path: The full path to the folder where the associated book is located.
:param filename: The name of the book file without the extension.
:param metadata: metadata belonging to the book. Use metadata.thumbnail
for cover
:param filepath: The full path to the e-book file
'''
pass
def delete_single_book(self, path):
os.unlink(path)
def delete_extra_book_files(self, path):
filepath = os.path.splitext(path)[0]
for ext in self.DELETE_EXTS:
for x in (filepath, path):
x += ext
if os.path.exists(x):
if os.path.isdir(x):
shutil.rmtree(x, ignore_errors=True)
else:
os.unlink(x)
if self.SUPPORTS_SUB_DIRS:
try:
os.removedirs(os.path.dirname(path))
except:
pass
[Doku]
def delete_books(self, paths, end_session=True):
debug_print('USBMS: deleting %d books'%(len(paths)))
for i, path in enumerate(paths):
self.report_progress((i+1) / float(len(paths)), _('Removing books from device...'))
path = self.normalize_path(path)
if os.path.exists(path):
# Delete the ebook
self.delete_single_book(path)
self.delete_extra_book_files(path)
self.report_progress(1.0, _('Removing books from device...'))
debug_print('USBMS: finished deleting %d books'%(len(paths)))
# If you override this method and you use book._new_book, then you must
# complete the processing before you call this method. The flag is cleared
# at the end just before the return
[Doku]
def sync_booklists(self, booklists, end_session=True):
debug_print('USBMS: starting sync_booklists')
json_codec = JsonCodec()
if not os.path.exists(self.normalize_path(self._main_prefix)):
os.makedirs(self.normalize_path(self._main_prefix))
def write_prefix(prefix, listid):
if (prefix is not None and len(booklists) > listid and
isinstance(booklists[listid], self.booklist_class)):
if not os.path.exists(prefix):
os.makedirs(self.normalize_path(prefix))
with open(self.normalize_path(os.path.join(prefix, self.METADATA_CACHE)), 'wb') as f:
json_codec.encode_to_file(f, booklists[listid])
fsync(f)
write_prefix(self._main_prefix, 0)
write_prefix(self._card_a_prefix, 1)
write_prefix(self._card_b_prefix, 2)
# Clear the _new_book indication, as we are supposed to be done with
# adding books at this point
for blist in booklists:
if blist is not None:
for book in blist:
book._new_book = False
self.report_progress(1.0, _('Sending metadata to device...'))
debug_print('USBMS: finished sync_booklists')
@classmethod
def build_template_regexp(cls):
from calibre.devices.utils import build_template_regexp
return build_template_regexp(cls.save_template())
@classmethod
def path_to_unicode(cls, path):
if isbytestring(path):
path = path.decode(filesystem_encoding)
return path
[Doku]
@classmethod
def normalize_path(cls, path):
'Return path with platform native path separators'
if path is None:
return None
if os.sep == '\\':
path = path.replace('/', '\\')
else:
path = path.replace('\\', '/')
return cls.path_to_unicode(path)
@classmethod
def parse_metadata_cache(cls, bl, prefix, name):
json_codec = JsonCodec()
need_sync = False
cache_file = cls.normalize_path(os.path.join(prefix, name))
if os.access(cache_file, os.R_OK):
try:
with open(cache_file, 'rb') as f:
json_codec.decode_from_file(f, bl, cls.book_class, prefix)
except:
import traceback
traceback.print_exc()
bl = []
need_sync = True
else:
need_sync = True
return need_sync
@classmethod
def update_metadata_item(cls, book):
changed = False
size = os.stat(cls.normalize_path(book.path)).st_size
if size != book.size:
changed = True
mi = cls.metadata_from_path(book.path)
book.smart_update(mi)
book.size = size
return changed
@classmethod
def metadata_from_path(cls, path):
return cls.metadata_from_formats([path])
@classmethod
def metadata_from_formats(cls, fmts):
from calibre.customize.ui import quick_metadata
from calibre.ebooks.metadata.meta import metadata_from_formats
with quick_metadata:
return metadata_from_formats(fmts, force_read_metadata=True,
pattern=cls.build_template_regexp())
@classmethod
def book_from_path(cls, prefix, lpath):
from calibre.ebooks.metadata.book.base import Metadata
if cls.settings().read_metadata or cls.MUST_READ_METADATA:
mi = cls.metadata_from_path(cls.normalize_path(os.path.join(prefix, lpath)))
else:
from calibre.ebooks.metadata.meta import metadata_from_filename
mi = metadata_from_filename(cls.normalize_path(os.path.basename(lpath)),
cls.build_template_regexp())
if mi is None:
mi = Metadata(os.path.splitext(os.path.basename(lpath))[0],
[_('Unknown')])
size = os.stat(cls.normalize_path(os.path.join(prefix, lpath))).st_size
book = cls.book_class(prefix, lpath, other=mi, size=size)
return book