Source code for kodiRename.process.process

# -*- coding: utf-8 -*-
#################################################################################
# MIT License
#
# Copyright (c) 2025 Duncan Fraser
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#################################################################################
#
# File          : process.py
#
# Author        : Duncan Fraser <dfraser@thedrunkencoder.uk>
#
# Description   :
#
#################################################################################

import os
import shutil
from dataclasses import dataclass
from typing import List, Literal, Optional

from kodiRename.collector import EpisodeInfo, MovieInfo, TVInfo, fileInfo, parseFileInfo
from kodiRename.config import ConfigSpec, movieConfig, tvConfig


[docs] def formatMovieParent(movie_info: MovieInfo, movie_config: movieConfig) -> str: """Generate the Parent Path for storing Movie :param movie_info: Movie Information :param movie_config: Configuration Options for Movies from Base Configuration :returns: Directory Name """ format_map = { "TITLE": movie_info.title, "YEAR": movie_info.year, "FORMAT": movie_info.formatString, } movie_parent = movie_config.parentFormat.format_map(format_map) return movie_parent
[docs] def formatMovieFile(movie_info: MovieInfo, movie_config: movieConfig) -> str: """Generate the file name for storing Movie :param movie_info: Movie Information :param movie_config: Configuration Options for Movies from Base Configuration :returns: Filename """ format_map = { "TITLE": movie_info.title, "YEAR": movie_info.year, "FORMAT": movie_info.formatString, } movie_file = movie_config.fileFormat.format_map(format_map) return movie_file
[docs] def formatTvParent( tv_info: TVInfo, episode_info: EpisodeInfo, tv_config: tvConfig ) -> str: """Generate the Parent Path for storing Episodes :param tv_info: TV Show Information :param episode_info: Episode Information :param tv_config: Configuration Options for TV Shows from Base Configuration :returns: Directory Name """ format_map = { "TITLE": tv_info.title, "YEAR": tv_info.year, } tv_parent = tv_config.parentFormat.format_map(format_map) return tv_parent
[docs] def formatTvSeason( tv_info: TVInfo, episode_info: EpisodeInfo, tv_config: tvConfig ) -> str: """Generate the Season Path for storing Episodes :param tv_info: TV Show Information :param episode_info: Episode Information :param tv_config: Configuration Options for TV Shows from Base Configuration :returns: Directory Name """ format_map = { "TITLE": tv_info.title, "YEAR": tv_info.year, "SEASON": episode_info.season, } tv_season = tv_config.seasonFormat.format_map(format_map) return tv_season
[docs] def formatTvEpisode( tv_info: TVInfo, episode_info: EpisodeInfo, tv_config: tvConfig ) -> str: """Generate the file name for storing Episodes :param tv_info: TV Show Information :param episode_info: Episode Information :param tv_config: Configuration Options for TV Shows from Base Configuration :returns: Filename """ format_map = { "SHOW_TITLE": tv_info.title, "TITLE": episode_info.title, "YEAR": tv_info.year, "SEASON": episode_info.season, "EPISODE": episode_info.episode, } tv_episode = tv_config.fileFormat.format_map(format_map) return tv_episode
[docs] @dataclass class renameOperation: """Rename Operation Stores the source and destination and provides a status string """ src: fileInfo """Source file information""" dest: fileInfo """Destination file information""" status: Optional[Literal["ok", "warning", "fail", "skip"]] = None """Status of operation"""
[docs] def performRenameOperation( config: ConfigSpec, data: renameOperation, dry_run: bool = False ) -> renameOperation: """Perform Rename Operation Will rename the source file path to the destination file path and create the path if it does not exist. The rename will not be performed if the source and destination file path are the same. If the mode is "copy" will copy the file before renaming. If the mode is "move" will move and rename. :param config: Configuration object :param data: Rename operation to perform :returns: Rename Operation """ if data.src == data.dest: data.status = "skip" config.log.debug( "Source and Destination are the same path : {0}".format(data.src.filename) ) return data if not os.path.exists(data.src.path): config.log.debug("Source does not exist : {0}".format(data.src.path)) data.status = "fail" return data if not os.path.isdir(data.dest.parentPath): os.makedirs(data.dest.parentPath, exist_ok=True) if not dry_run: if config.general.mode == "move": shutil.move(src=data.src.path, dst=data.dest.path) elif config.general.mode == "copy": shutil.copy(src=data.src.path, dst=data.dest.path) data.status = "ok" return data
[docs] def performCleanDirectory( config: ConfigSpec, data: renameOperation, dry_run: bool = False ) -> bool: """Clean Source Directory Delete parent path of source file information. Will not be performed if the destination parent path is the same as the source parent path :param config: Configuration object :param data: Rename Operation to delete src parent path """ if not os.path.isdir(data.src.parentPath): config.log.debug("Parent Path does not exit: {0}".format(data.src.parentPath)) return False if data.src.parentPath == data.src.basePath: config.log.debug( "Parent Path and Base Path are the same: {0}".format(data.src.basePath) ) return False if data.src.parentPath == data.dest.parentPath: config.log.debug( "Source and Destination Parent Path are the same: {0}".format( data.src.parentPath ) ) return False if not dry_run: shutil.rmtree(path=data.src.parentPath) else: config.log.debug("Dry Run Would remove: {0}".format(data.src.parentPath)) return True
[docs] def processMovieRename(config: ConfigSpec, movie_info: MovieInfo) -> fileInfo: """Process Movie Information into Output File Information :param config: Configuration object :param movie_info: Movie Information :returns: File information for output file """ new_parent_path = formatMovieParent( movie_info=movie_info, movie_config=config.movie_config ) new_filename = formatMovieFile( movie_info=movie_info, movie_config=config.movie_config ) if config.movie_config.outputDirectory is not None: new_path = os.path.join( config.movie_config.outputDirectory, new_parent_path, new_filename + movie_info.file.extension, ) base_path = config.movie_config.outputDirectory else: new_path = os.path.join( config.movie_config.basePath, new_parent_path, new_filename + movie_info.file.extension, ) base_path = config.movie_config.basePath file_info = parseFileInfo(path=new_path, base_path=base_path) return file_info
[docs] def processTvRename( config: ConfigSpec, tv_info: TVInfo, episode_info: EpisodeInfo ) -> fileInfo: """Process Episodes Information into Output File Information :param config: Configuration object :param tv_info: TV information :param episode_info: Episode information :returns: File information for output file """ new_parent_path = formatTvParent( tv_info=tv_info, episode_info=episode_info, tv_config=config.tv_config ) new_season_path = formatTvSeason( tv_info=tv_info, episode_info=episode_info, tv_config=config.tv_config ) new_filename = formatTvEpisode( tv_info=tv_info, episode_info=episode_info, tv_config=config.tv_config ) if config.tv_config.outputDirectory is not None: new_path = os.path.join( config.tv_config.outputDirectory, new_parent_path, new_season_path, new_filename + episode_info.file.extension, ) base_path = config.tv_config.outputDirectory else: new_path = os.path.join( config.tv_config.basePath, new_parent_path, new_season_path, new_filename + episode_info.file.extension, ) base_path = config.tv_config.basePath file_info = parseFileInfo(path=new_path, base_path=base_path) return file_info
[docs] def collectMovieRename( config: ConfigSpec, movie_list: list[MovieInfo] ) -> List[renameOperation]: """Collect Movie Rename Operations from Movie Information :param config: Configuration object :param movie_list: List of Movie Info :returns: List of rename operations to be performed """ file_list: List[renameOperation] = [] for movie in movie_list: new_file_info = processMovieRename(config=config, movie_info=movie) file_list.append(renameOperation(src=movie.file, dest=new_file_info)) return file_list
[docs] def collectTvRename(config: ConfigSpec, tv_info: List[TVInfo]) -> List[renameOperation]: """Collect TV Rename Operations from Movie Information :param config: Configuration object :param tv_info: List of TV info :returns: List of rename operations to be performed """ file_list: List[renameOperation] = [] for tv in tv_info: for episode in tv.episodes: new_file_info = processTvRename( config=config, tv_info=tv, episode_info=episode ) file_list.append(renameOperation(src=episode.file, dest=new_file_info)) return file_list