Source code for subliminal.providers.tvsubtitles

# -*- coding: utf-8 -*-
import io
import logging
import re
from zipfile import ZipFile

from babelfish import Language, language_converters
from guessit import guessit
from requests import Session

from . import ParserBeautifulSoup, Provider
from .. import __short_version__
from ..cache import EPISODE_EXPIRATION_TIME, SHOW_EXPIRATION_TIME, region
from ..exceptions import ProviderError
from ..score import get_equivalent_release_groups
from ..subtitle import Subtitle, fix_line_ending, guess_matches
from ..utils import sanitize, sanitize_release_group
from ..video import Episode

logger = logging.getLogger(__name__)

language_converters.register('tvsubtitles = subliminal.converters.tvsubtitles:TVsubtitlesConverter')

link_re = re.compile(r'^(?P<series>.+?)(?: \(?\d{4}\)?| \((?:US|UK)\))? \((?P<first_year>\d{4})-\d{4}\)$')
episode_id_re = re.compile(r'^episode-\d+\.html$')


[docs]class TVsubtitlesSubtitle(Subtitle): """TVsubtitles Subtitle.""" provider_name = 'tvsubtitles' def __init__(self, language, page_link, subtitle_id, series, season, episode, year, rip, release): super(TVsubtitlesSubtitle, self).__init__(language, page_link=page_link) self.subtitle_id = subtitle_id self.series = series self.season = season self.episode = episode self.year = year self.rip = rip self.release = release @property def id(self): return str(self.subtitle_id)
[docs] def get_matches(self, video): matches = set() # series if video.series and (sanitize(self.series) in ( sanitize(name) for name in [video.series] + video.alternative_series)): matches.add('series') # season if video.season and self.season == video.season: matches.add('season') # episode if video.episode and self.episode == video.episode: matches.add('episode') # year if video.original_series and self.year is None or video.year and video.year == self.year: matches.add('year') # release_group if (video.release_group and self.release and any(r in sanitize_release_group(self.release) for r in get_equivalent_release_groups(sanitize_release_group(video.release_group)))): matches.add('release_group') # other properties if self.release: matches |= guess_matches(video, guessit(self.release, {'type': 'episode'}), partial=True) if self.rip: matches |= guess_matches(video, guessit(self.rip), partial=True) return matches
[docs]class TVsubtitlesProvider(Provider): """TVsubtitles Provider.""" languages = {Language('por', 'BR')} | {Language(l) for l in [ 'ara', 'bul', 'ces', 'dan', 'deu', 'ell', 'eng', 'fin', 'fra', 'hun', 'ita', 'jpn', 'kor', 'nld', 'pol', 'por', 'ron', 'rus', 'spa', 'swe', 'tur', 'ukr', 'zho' ]} video_types = (Episode,) server_url = 'http://www.tvsubtitles.net/' subtitle_class = TVsubtitlesSubtitle def __init__(self): self.session = None
[docs] def initialize(self): self.session = Session() self.session.headers['User-Agent'] = 'Subliminal/%s' % __short_version__
[docs] def terminate(self): self.session.close()
[docs] @region.cache_on_arguments(expiration_time=SHOW_EXPIRATION_TIME) def search_show_id(self, series, year=None): """Search the show id from the `series` and `year`. :param str series: series of the episode. :param year: year of the series, if any. :type year: int :return: the show id, if any. :rtype: int """ # make the search logger.info('Searching show id for %r', series) r = self.session.post(self.server_url + 'search.php', data={'q': series}, timeout=10) r.raise_for_status() # get the series out of the suggestions soup = ParserBeautifulSoup(r.content, ['lxml', 'html.parser']) show_id = None for suggestion in soup.select('div.left li div a[href^="/tvshow-"]'): match = link_re.match(suggestion.text) if not match: logger.error('Failed to match %s', suggestion.text) continue if match.group('series').lower() == series.lower(): if year is not None and int(match.group('first_year')) != year: logger.debug('Year does not match') continue show_id = int(suggestion['href'][8:-5]) logger.debug('Found show id %d', show_id) break return show_id
[docs] @region.cache_on_arguments(expiration_time=EPISODE_EXPIRATION_TIME) def get_episode_ids(self, show_id, season): """Get episode ids from the show id and the season. :param int show_id: show id. :param int season: season of the episode. :return: episode ids per episode number. :rtype: dict """ # get the page of the season of the show logger.info('Getting the page of show id %d, season %d', show_id, season) r = self.session.get(self.server_url + 'tvshow-%d-%d.html' % (show_id, season), timeout=10) soup = ParserBeautifulSoup(r.content, ['lxml', 'html.parser']) # loop over episode rows episode_ids = {} for row in soup.select('table#table5 tr'): # skip rows that do not have a link to the episode page if not row('a', href=episode_id_re): continue # extract data from the cells cells = row('td') episode = int(cells[0].text.split('x')[1]) episode_id = int(cells[1].a['href'][8:-5]) episode_ids[episode] = episode_id if episode_ids: logger.debug('Found episode ids %r', episode_ids) else: logger.warning('No episode ids found') return episode_ids
[docs] def query(self, show_id, series, season, episode, year=None): # get the episode ids episode_ids = self.get_episode_ids(show_id, season) if episode not in episode_ids: logger.error('Episode %d not found', episode) return [] # get the episode page logger.info('Getting the page for episode %d', episode_ids[episode]) r = self.session.get(self.server_url + 'episode-%d.html' % episode_ids[episode], timeout=10) soup = ParserBeautifulSoup(r.content, ['lxml', 'html.parser']) # loop over subtitles rows subtitles = [] for row in soup.select('.subtitlen'): # read the item language = Language.fromtvsubtitles(row.h5.img['src'][13:-4]) subtitle_id = int(row.parent['href'][10:-5]) page_link = self.server_url + 'subtitle-%d.html' % subtitle_id rip = row.find('p', title='rip').text.strip() or None release = row.find('h5').text.strip() or None subtitle = self.subtitle_class(language, page_link, subtitle_id, series, season, episode, year, rip, release) logger.debug('Found subtitle %s', subtitle) subtitles.append(subtitle) return subtitles
[docs] def list_subtitles(self, video, languages): # lookup show_id titles = [video.series] + video.alternative_series show_id = None for title in titles: show_id = self.search_show_id(title, video.year) if show_id is not None: break # query for subtitles with the show_id if show_id is not None: subtitles = [s for s in self.query(show_id, title, video.season, video.episode, video.year) if s.language in languages and s.episode == video.episode] if subtitles: return subtitles else: logger.error('No show id found for %r (%r)', video.series, {'year': video.year}) return []
[docs] def download_subtitle(self, subtitle): # download as a zip logger.info('Downloading subtitle %r', subtitle) r = self.session.get(self.server_url + 'download-%d.html' % subtitle.subtitle_id, timeout=10) r.raise_for_status() # open the zip with ZipFile(io.BytesIO(r.content)) as zf: if len(zf.namelist()) > 1: raise ProviderError('More than one file to unzip') subtitle.content = fix_line_ending(zf.read(zf.namelist()[0]))