"""
Module `provider` provides the core logic behind providers.
"""
import base64
import json
import logging
import sys
import threading
import time
import xbmc
import xbmcaddon
import xbmcgui
from .kodi import ADDON_ID, ADDON_NAME, get_installed_addons, notify_all, set_logger, run_script
from .utils import bytes_to_str, str_to_bytes
__all__ = ["ProviderResult", "Provider"]
def get_providers():
return [p_id for p_id, _ in get_installed_addons(addon_type="xbmc.python.script", enabled=True)
if p_id.startswith("script.flix.") and p_id != ADDON_ID]
def send_to_providers(providers, method, *args, **kwargs):
data = {}
if args:
data["args"] = args
if kwargs:
data["kwargs"] = kwargs
data_b64 = bytes_to_str(base64.b64encode(str_to_bytes(json.dumps(data))))
for provider in providers:
run_script(provider, ADDON_ID, method, data_b64)
def send_to_provider(provider, method, *args, **kwargs):
send_to_providers((provider,), method, *args, **kwargs)
def _setter_and_getter(attribute):
def setter(self, value):
self[attribute] = value
def getter(self):
return self.get(attribute)
setter.__doc__ = "Set {}".format(attribute)
getter.__doc__ = "Get {}".format(attribute)
return setter, property(getter)
[docs]
class ProviderResult(dict):
# noinspection PyUnresolvedReferences
"""
The ProviderResult is the only type allowed to be returned by :func:`Provider.search`,
:func:`Provider.search_movie` and :func:`Provider.search_episode` methods.
:keyword label: The result label.
:type label: str, optional
:keyword label2: The result label2.
:type label2: str, optional
:keyword icon: The result icon. If not set, the fallback icon will be used.
:type icon: str, optional
:keyword url: The result url. If not set, then `provider_data` must be set, otherwise the result is discarded.
:type url: str, optional
:keyword provider_data: Data to be sent to provider when :func:`Provider.resolve` is called.
:type provider_data: any, optional
Example::
# Start by creating the result with some values
result = ProviderResult(
label="label",
label2="label2",
icon="foo/icon.png",
url="http://foo.bar/video.mp4",
)
# We can then modify values
result.set_label2("new label2")
"""
set_label, label = _setter_and_getter("label")
set_label2, label2 = _setter_and_getter("label2")
set_icon, icon = _setter_and_getter("icon")
set_url, url = _setter_and_getter("url")
set_provider_data, provider_data = _setter_and_getter("provider_data")
[docs]
class Provider(object):
"""
This is where all the logic behind a provider must be implemented. It has methods which
are required to be implemented by it's subclass (:func:`search`, :func:`search_movie` and
:func:`search_episode`) and also optional methods (:func:`resolve`). The provider can then
be registered by using :func:`register`.
Example::
class CustomProvider(Provider):
def search(self, query):
# Implementation here
return []
def search_movie(self, tmdb_id, title, titles, year=None):
return self.search("{:s} {:d}".format(title, year) if year else title)
def search_show(self, tmdb_id, show_title, titles, year=None):
return self.search("{:s} {:d}".format(show_title, year) if year else show_title)
def search_season(self, tmdb_id, show_title, season_number, titles):
return self.search("{:s} S{:02d}".format(show_title, season_number))
def search_episode(self, tmdb_id, show_title, season_number, episode_number, titles):
return self.search("{:s} S{:02d}E{:02d}".format(show_title, season_number, episode_number))
CustomProvider().register()
"""
def __init__(self):
self.logger = set_logger("provider")
self._methods = {}
for name in dir(self):
if not name.startswith("_") and name != "register":
attr = getattr(self, name)
if callable(attr):
self._methods[name] = attr
def search(self, query):
"""
Perform a raw search.
:param query: The query for performing the search.
:type query: str
:return: List of search results.
:rtype: list[ProviderResult]
"""
raise NotImplementedError("'search' method must be implemented")
def search_movie(self, tmdb_id, title, titles, year=None):
"""
Search a movie.
:param tmdb_id: The movie TMDB id.
:type tmdb_id: str
:param title: The movie title.
:type title: str
:param titles: Dictionary containing key-pairs of country (ISO-3166-1 lowercase) and title, respectively.
:type titles: dict[str, str]
:param year: The movie release year. This is optional, as some movies don't have a release date attribute.
:type year: int, optional
:return: List of search results.
:rtype: list[ProviderResult]
"""
raise NotImplementedError("'search_movie' method must be implemented")
def search_show(self, tmdb_id, show_title, titles, year=None):
"""
Search a tv show.
:param tmdb_id: The tv show TMDB id.
:type tmdb_id: str
:param show_title: The show title.
:type show_title: str
:param titles: Dictionary containing key-pairs of country (ISO-3166-1 lowercase) and title, respectively.
:type titles: dict[str, str]
:param year: The show first air year. This is optional, as some shows don't have this attribute.
:type year: int, optional
:return: List of search results.
:rtype: list[ProviderResult]
"""
raise NotImplementedError("'search_show' method must be implemented")
def search_season(self, tmdb_id, show_title, season_number, titles):
"""
Search a season.
:param tmdb_id: The tv show TMDB id.
:type tmdb_id: str
:param show_title: The show title.
:type show_title: str
:param season_number: The season number.
:type season_number: int
:param titles: Dictionary containing key-pairs of country (ISO-3166-1 lowercase) and title, respectively.
:type titles: dict[str, str]
:return: List of search results.
:rtype: list[ProviderResult]
"""
raise NotImplementedError("'search_season' method must be implemented")
def search_episode(self, tmdb_id, show_title, season_number, episode_number, titles):
"""
Search an episode.
:param tmdb_id: The tv show TMDB id.
:type tmdb_id: str
:param show_title: The show title.
:type show_title: str
:param season_number: The season number.
:type season_number: int
:param episode_number: The episode number.
:type episode_number: int
:param titles: Dictionary containing key-pairs of country (ISO-3166-1 lowercase) and title, respectively.
:type titles: dict[str, str]
:return: List of search results.
:rtype: list[ProviderResult]
"""
raise NotImplementedError("'search_episode' method must be implemented")
def resolve(self, provider_data):
"""
Resolve method is only called in cases where the provider has not set (:attr:`ProviderResult.url`)
but did set the (:attr:`ProviderResult.provider_data`) parameter (which will be used here).
This may be useful in cases where the `url` can't be obtained right away.
:param provider_data: `provided_data` from result (:class:`ProviderResult`) .
:type provider_data: any
:return: The url to be played.
:rtype: str
"""
raise NotImplementedError("'resolve' method must be implemented")
def ping(self):
"""
Ping method for checking the communication with a provider.
:return: The provider id.
:rtype: str
"""
self.logger.debug("Ping method called")
return ADDON_ID
def register(self):
"""
Register the provider for execution.
"""
self.logger.debug("Running with args: %s", sys.argv)
if len(sys.argv) != 4:
self.logger.error("Expecting 4 arguments. Got %s", sys.argv)
xbmcgui.Dialog().notification(ADDON_NAME, xbmcaddon.Addon("plugin.video.flix").getLocalizedString(30109))
return
_, sender, method, data_b64 = sys.argv
if method in self._methods:
try:
data = json.loads(base64.b64decode(data_b64))
value = self._methods[method](*data.get("args", []), **data.get("kwargs", {}))
except Exception as e:
self.logger.error("Failed running method '%s' with data '%s': %s", method, data_b64, e, exc_info=True)
value = None
if not notify_all(ADDON_ID, "{}.{}".format(sender, method), value):
self.logger.error("Failed sending provider data")
else:
self.logger.error("Unknown method provided '%s'. Expecting one of %s", method, self._methods.keys())
raise ValueError("Unknown method provided")
class ProviderListener(xbmc.Monitor):
def __init__(self, providers, method, timeout=10):
super(ProviderListener, self).__init__()
self._waiting = {i: True for i in providers}
self._method = "Other.{}.{}".format(ADDON_ID, method)
self._timeout = timeout
self._data = {}
self._lock = threading.Lock()
self._start_time = time.time()
def onNotification(self, sender, method, data):
logging.debug("Received notification with sender=%s, method=%s, data=%s", sender, method, data)
with self._lock:
if method == self._method and self._waiting.get(sender, False):
try:
self._data[sender] = json.loads(data)
except Exception as e:
logging.error("Unable to get data from sender '%s': %s", sender, e)
else:
self._waiting[sender] = False
self.on_receive(sender)
def on_receive(self, sender):
pass
def is_complete(self):
with self._lock:
return not any(self._waiting.values())
def wait(self, **kwargs):
if kwargs.get("reset"):
self._start_time = time.time()
timeout = kwargs.get("timeout", self._timeout)
while not (self.is_complete() or 0 < timeout < time.time() - self._start_time or self.waitForAbort(0.2)):
pass
def get_missing(self):
with self._lock:
return [k for k, v in self._waiting.items() if v]
@property
def data(self):
return self._data
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.wait()
missing = self.get_missing()
if missing:
logging.warning("Provider(s) timed out: %s", ", ".join(missing))
return False