Source code for kodiRename.collector.collector

# -*- coding: utf-8 -*-
#################################################################################
# MIT License
#
# Copyright (c) 2025 Duncan Fraser
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#################################################################################
#
# File          : collector.py
#
# Author        : Duncan Fraser <dfraser@thedrunkencoder.uk>
#
# Description   :
#
#################################################################################

import os
import pickle
from dataclasses import dataclass, field
from typing import BinaryIO, Final, List, Optional, Set

import requests

from kodiRename.config import ConfigSpec, driveMappingConfig

videoSizeMap: Final[List[dict]] = [
    {"min": 0, "max": 360, "label": "SD"},
    {"min": 361, "max": 480, "label": "480"},
    {"min": 481, "max": 576, "label": "576"},
    {"min": 577, "max": 720, "label": "720p"},
    {"min": 721, "max": 1080, "label": "1080p"},
    {"min": 1081, "max": 100000, "label": "2160p"},
]


[docs] @dataclass class fileInfo: """File information""" path: str """Full path""" parentPath: str """Parent directory path""" extension: str """File extension""" filename: str """File filename""" basePath: str """Base Common Path"""
[docs] def parseFileInfo(path: str, base_path: str) -> fileInfo: """Parse path strings to File Information :param path: Full path to file :param base_path: Base Common Path :returns: File Information """ extension = os.path.splitext(path)[-1:][0] filename = os.path.splitext(os.path.basename(path))[0] parentPath = os.path.dirname(path) return fileInfo( path=path, parentPath=parentPath, extension=extension, filename=filename, basePath=base_path, )
[docs] def parseVideoFormat(video_details: List[dict]) -> str: """Use video details from Kodi JSONRPC to get the Video Format :params video_details: Video details extracted from kodi data :returns: String describing item Video Format """ match_label = "SD" if len(video_details) > 0: if video_details[0]["height"] and (video_details[0]["height"] > 0): match_label = [ x["label"] for x in videoSizeMap if x["min"] <= video_details[0]["height"] and video_details[0]["height"] <= x["max"] ][0] return match_label
[docs] def replaceDriveMapping(path: str, drive_mapping: List[driveMappingConfig]) -> str: """Replace paths using the available drive mappings :param path: Path to file :param drive_mapping: List of path replacements :returns: Path where path is replaced with available drive mappings """ for mapping in drive_mapping: if mapping.src == "": break if path.startswith(mapping.src): return path.replace(mapping.src, mapping.dest) return path
[docs] def slugify(source_string: str, remove_string: str = "%:/,.\\[]<>*?") -> str: """Remove undesirable characters from string :param source_string: Source string to perform remove on :param remove_string: String containing characters to remove from source string :returns: Source string without characters in remove_string """ # remove the undesirable characters return "".join([c for c in source_string if c not in remove_string])
[docs] @dataclass class MovieInfo: """Movie Information describing items parsed from Kodi""" title: str """Movie Title""" year: int """Release Year""" file: fileInfo """Movie File Information""" formatString: str """Video Format, derived from video """ data: Optional[dict] = None """Optional raw data fetched from Kodi"""
[docs] @dataclass class EpisodeInfo: """Episode Information describing items parsed from Kodi""" title: str """Episode Title""" season: int """Season number""" episode: int """Episode number within season""" tvShowId: int """TV Show ID""" file: fileInfo """Episode File Information""" data: Optional[dict] = None """Optional raw data fetched from Kodi"""
[docs] @dataclass class TVInfo: """TV Information describing items parsed from Kodi""" title: str """TV Title""" year: int """Release Year""" tvShowId: int """TV Show ID""" seasonSet: Set[int] = field(default_factory=set) """Set containing season numbers""" episodes: List[EpisodeInfo] = field(default_factory=list) """List of episodes information""" data: Optional[dict] = None """Optional raw data fetched from Kodi"""
[docs] def getMovieData(config: ConfigSpec, start: int = 0, end: int = -1) -> List[dict]: """Get Movie Information from Kodi JSONRPC :param config: Configuration object :param start: Start ID to fetch :param end: End ID to fetch :returns: Dictionary List from Raw Data from JSONRPC """ id = 1010 data = { "jsonrpc": "2.0", "method": "VideoLibrary.GetMovies", "params": { "properties": ["title", "year", "file", "streamdetails"], "limits": {"start": start, "end": end}, }, "id": id, } url = "{0}://{1}:{2}/jsonrpc".format( "https" if config.kodi.use_https else "http", config.kodi.host, config.kodi.host_port, ) req = requests.post(url=url, json=data, verify=False) if req.status_code == 200: return list(req.json()["result"]["movies"]) else: return []
[docs] def getTvData(config: ConfigSpec, start: int = 0, end: int = -1) -> List[dict]: """Get TV Information from Kodi JSONRPC :param config: Configuration object :param start: Start ID to fetch :param end: End ID to fetch :returns: Dictionary List from Raw Data from JSONRPC """ id = 2020 data = { "jsonrpc": "2.0", "method": "VideoLibrary.GetTVShows", "params": { "properties": ["title", "year"], "limits": {"start": start, "end": end}, }, "id": id, } url = "{0}://{1}:{2}/jsonrpc".format( "https" if config.kodi.use_https else "http", config.kodi.host, config.kodi.host_port, ) req = requests.post(url=url, json=data, verify=False) if req.status_code == 200: return list(req.json()["result"]["tvshows"]) else: return []
[docs] def getEpisodeData(config: ConfigSpec, start: int = 0, end: int = -1) -> List[dict]: """Get Episode Information from Kodi JSONRPC :param config: Configuration object :param start: Start ID to fetch :param end: End ID to fetch :returns: Dictionary List from Raw Data from JSONRPC """ id = 3030 data = { "jsonrpc": "2.0", "method": "VideoLibrary.GetEpisodes", "params": { "properties": [ "title", "tvshowid", "season", "episode", "showtitle", "file", ], "limits": {"start": start, "end": end}, }, "id": id, } url = "http://{0}:{1}/jsonrpc".format(config.kodi.host, config.kodi.host_port) req = requests.post(url=url, json=data, verify=False) if req.status_code == 200: return list(req.json()["result"]["episodes"]) else: return []
[docs] def parseMovieInfo( config: ConfigSpec, data: dict, appendRaw: bool = False ) -> MovieInfo: """Parse data from Kodi JSONRPC into movie information :param config: Configuration object :param data: Data from Kodi JSONRPC :param appendRaw: Store data param in output if True :returns: Movie Information """ file_path = replaceDriveMapping( path=data["file"], drive_mapping=config.driveMapping ) movie_info = MovieInfo( title=slugify(data["title"]), file=parseFileInfo(file_path, config.movie_config.basePath), formatString=parseVideoFormat(data["streamdetails"]["video"]), year=data["year"], data=(data if appendRaw else None), ) return movie_info
[docs] def parseTvInfo(data: dict, appendRaw: bool = False) -> TVInfo: """Parse data from Kodi JSONRPC into TV information :param data: Data from Kodi JSONRPC :param appendRaw: Store data param in output if True :returns: TV Information """ return TVInfo( title=slugify(data["title"]), year=data["year"], tvShowId=data["tvshowid"], data=(data if appendRaw else None), )
[docs] def parseEpisodeInfo( config: ConfigSpec, data: dict, appendRaw: bool = False ) -> EpisodeInfo: """Parse data from Kodi JSONRPC into episode information :param config: Configuration object :param data: Data from Kodi JSONRPC :param appendRaw: Store data param in output if True :returns: Movie Information """ file_path = replaceDriveMapping( path=data["file"], drive_mapping=config.driveMapping ) episode_info = EpisodeInfo( title=slugify(data["title"]), season=data["season"], episode=data["episode"], file=parseFileInfo(file_path, config.tv_config.basePath), tvShowId=data["tvshowid"], data=(data if appendRaw else None), ) return episode_info
[docs] def getMovieInfo(config: ConfigSpec) -> List[MovieInfo]: """Collect all Movies from Kodi and parse into Movie Information :param config: Configuration object :returns: List of Movie Information collected from Kodi JSONRPC """ movie_info: list[MovieInfo] = [] movie_data = getMovieData(config=config, start=0, end=0) for movie in movie_data: movie_info.append(parseMovieInfo(config=config, data=movie)) return movie_info
[docs] def getTvInfo(config: ConfigSpec) -> List[TVInfo]: """Collect all Episodes and TV Shows from Kodi and parse into TV Information :param config: Configuration object :returns: List of TV Information collected from Kodi JSONRPC """ tv_shows: dict[int, TVInfo] = {} tv_data = getTvData(config=config, start=0, end=0) for tv in tv_data: tv_info = parseTvInfo(tv) tv_shows[tv_info.tvShowId] = tv_info episode_data = getEpisodeData(config=config, start=0, end=0) for episode in episode_data: episode_info = parseEpisodeInfo(config=config, data=episode) if episode_info.tvShowId in tv_shows: tv_shows[episode_info.tvShowId].episodes.append(episode_info) tv_shows[episode_info.tvShowId].seasonSet.add(episode_info.season) tv_info_list: list[TVInfo] = [] for key, value in tv_shows.items(): tv_info_list.append(value) return tv_info_list
[docs] def serializeMovieInfo(pickleIO: BinaryIO, movieInfo: List[MovieInfo]) -> None: """Write Movie Information to File :param pickleIO: Writeable Binary I/O object :param movieInfo: List of Movie Information .. code-block:: python movie_info = [] with open("output", "wb") as file: kodiRename.collector.serializeMovieInfo(pickleIo=file, movieInfo=movie_info) """ pickle.dump(movieInfo, file=pickleIO)
[docs] def deserializeMovieInfo(pickleIO: BinaryIO) -> List[MovieInfo]: """Get Movie Information from File :param pickleIO: Readable Binary I/O object :returns: Movie Information .. code-block:: python with open("output", "rb") as file: movie_info = kodiRename.collector.deserializeMovieInfo(pickleIo=file) """ return pickle.load(pickleIO)
[docs] def serializeTVInfo(pickleIO: BinaryIO, tvInfo: List[TVInfo]) -> None: """Write Movie Information to File :param pickleIO: Writable Binary I/O object :param movieInfo: List of TV Information .. code-block:: python tv_info = [] with open("output", "wb") as file: kodiRename.collector.serializeTVInfo(pickleIo=file, movieInfo=tv_info) """ pickle.dump(tvInfo, file=pickleIO)
[docs] def deserializeTVInfo(pickleIO: BinaryIO) -> List[TVInfo]: """Get Movie Information from File :param pickleIO: Readable Binary I/O object :returns: List of TV Information .. code-block:: python with open("output", "rb") as file: tv_info = kodiRename.collector.deserializeTVInfo(pickleIo=file) """ return pickle.load(pickleIO)
@dataclass class MovieInfoStatistics: count: int @dataclass class TvInfoStatistics: count: int episodeCount: int def movieInfoStatistics(data: List[MovieInfo]) -> MovieInfoStatistics: """Get statistics about collected Movies :param data: List of Movies :returns: Statistics """ return MovieInfoStatistics(count=len(data)) def tvInfoStatistics(data: List[TVInfo]) -> TvInfoStatistics: """Get statistics about collected Tv Shows :param data: List of Tv Shows :returns: Statistics """ episode_count = 0 for tv in data: episode_count += len(tv.episodes) return TvInfoStatistics(count=len(data), episodeCount=episode_count)