# -*- coding: utf-8 -*-
#################################################################################
# MIT License
#
# Copyright (c) 2025 Duncan Fraser
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#################################################################################
#
# File : kodiRename.py
#
# Author : Duncan Fraser <dfraser@thedrunkencoder.uk>
#
# Description :
#
#################################################################################
import logging
import os
import sys
from argparse import ArgumentParser, Namespace
from typing import Final, Optional, Tuple
from alive_progress import alive_bar
from . import collector, config, process
DEFAULT_CONFIG_FILE: Final = "config.ini"
OUTPUT_MOVIES: Final = "movie.bin"
OUTPUT_TV: Final = "tv.bin"
[docs]
def collectConfig(config_path: str = DEFAULT_CONFIG_FILE) -> config.ConfigSpec:
"""Parse Configuration from file. If file does not exist returns default configuration
:param config_path: Path to configuration file
:returns: Populated configuration
"""
config_data = config.ConfigSpec()
if not os.path.exists(config_path):
return config_data
with open(config_path, "rb") as conf_file:
config_data = config.parseConfig(configIO=conf_file)
return config_data
[docs]
def configLogger(
level: str, log_file: Optional[str] = None, log_std: bool = False
) -> logging.Logger:
"""Configure logging object
:param level: String representation of log level
:param log_file: Path to log file
:param log_std: If True output log to standard output
:returns: Logging Object
"""
logger = logging.getLogger(__name__)
form = logging.Formatter(
"%(asctime)s %(levelname)s [%(module)s:%(funcName)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log_levels = {
"debug": logging.DEBUG,
"info": logging.INFO,
"warning": logging.WARNING,
"error": logging.ERROR,
"critical": logging.CRITICAL,
}
if level not in log_levels:
raise ValueError("Undefined Level {0}".format(level))
logger.setLevel(log_levels[level])
if log_file:
fh = logging.FileHandler(filename=log_file, mode="a")
fh.setFormatter(form)
logger.addHandler(fh)
if log_std:
sh = logging.StreamHandler(sys.stdout)
sh.setFormatter(form)
logger.addHandler(sh)
return logger
[docs]
def parseArgs(config_data: config.ConfigSpec) -> Tuple[Namespace, config.ConfigSpec]:
"""Parse command line arguments
:params config_data: Configuration data, will be modified and returned
:returns: Tuple containing parsed arguments and modified configuration information
"""
desc_str = "Utility for renaming video files from a kodi library"
parser = ArgumentParser(prog="kodiRename", description=desc_str)
parser.add_argument("--config", "-c", required=False, default=DEFAULT_CONFIG_FILE)
parser.add_argument(
"--debug",
"-d",
action="store",
default="info",
choices=["debug", "info", "warning", "error", "critical"],
required=False,
)
parser.add_argument("--logfile", action="store", required=False, metavar="xxx.log")
parser.add_argument(
"--quiet", "-q", action="store_true", required=False, default=False
)
subparsers = parser.add_subparsers(dest="command", required=True)
init = subparsers.add_parser("init", help="Initialize config file") # noqa: F841
collection = subparsers.add_parser( # noqa: F841
"collect", help="Collect Information"
)
rename = subparsers.add_parser("rename", help="Use Information")
clean = subparsers.add_parser("clean", help="Print Information")
stats = subparsers.add_parser("stats", help="Shows collections statistics") # type: ignore # noqa: F841
rename.add_argument("--dryRun", required=False, action="store_true")
clean.add_argument("--dryRun", required=False, action="store_true")
args = parser.parse_args()
if args.config:
config_data = collectConfig(args.config)
if args.logfile:
config_data.general.logFile = args.logfile
return (args, config_data)
[docs]
def init(config_path: str = DEFAULT_CONFIG_FILE) -> int:
"""Write default configuration to file
:param config_path: Path to configuration file
:returns: Status Code
"""
with open(config_path, "w") as conf_file:
config.writeDefaultConfig(configIO=conf_file)
return 0
[docs]
def collect(config_data: config.ConfigSpec) -> int:
"""Collect Move and TV information and serialize
:param config: Configuration object
:returns: Status Code
"""
movie_info = collector.getMovieInfo(config=config_data)
tv_info = collector.getTvInfo(config=config_data)
if not os.path.isdir(config_data.general.dataDirectory):
os.makedirs(config_data.general.dataDirectory, exist_ok=True)
with open(
os.path.join(config_data.general.dataDirectory, OUTPUT_MOVIES), "wb"
) as movie_file:
collector.serializeMovieInfo(pickleIO=movie_file, movieInfo=movie_info)
with open(
os.path.join(config_data.general.dataDirectory, OUTPUT_TV), "wb"
) as tv_file:
collector.serializeTVInfo(pickleIO=tv_file, tvInfo=tv_info)
movie_stats = collector.movieInfoStatistics(data=movie_info)
tv_stats = collector.tvInfoStatistics(data=tv_info)
config_data.log.info(
"Collection complete. Found %s Movies and %s Episodes over %s Tv Shows",
movie_stats.count,
tv_stats.episodeCount,
tv_stats.count,
)
return 0
[docs]
def rename(config_data: config.ConfigSpec, dry_run: bool = False) -> int:
"""Perform rename operations from serialized data
:param config: Configuration object
:param dry_run: If True, will perform as dry run of rename operation
:returns: Status Code
"""
with open(
os.path.join(config_data.general.dataDirectory, OUTPUT_MOVIES), "rb"
) as movie_file:
movie_info = collector.deserializeMovieInfo(pickleIO=movie_file)
with open(
os.path.join(config_data.general.dataDirectory, OUTPUT_TV), "rb"
) as tv_file:
tv_info = collector.deserializeTVInfo(pickleIO=tv_file)
movie_rename = process.collectMovieRename(config=config_data, movie_list=movie_info)
tv_rename = process.collectTvRename(config=config_data, tv_info=tv_info)
with alive_bar(len(movie_rename), title="Movie Rename") as progress:
for movie in movie_rename:
config_data.log.info(
"{2} Movie : {0} --> {1}".format(
movie.src.path,
movie.dest.path,
"Moving" if config_data.general.mode == "move" else "Copying",
)
)
process.performRenameOperation(
config=config_data, data=movie, dry_run=dry_run
)
progress()
with alive_bar(len(tv_rename), title="TV Rename ") as progress:
for tv in tv_rename:
config_data.log.info(
"{2} Episode : {0} --> {1}".format(
tv.src.path,
tv.dest.path,
"Moving" if config_data.general.mode == "move" else "Copying",
)
)
process.performRenameOperation(config=config_data, data=tv, dry_run=dry_run)
progress()
return 0
[docs]
def clean(config_data: config.ConfigSpec, dry_run: bool = False) -> int:
"""Perform clean operation
:param config: Configuration object
:param dry_run: If True, will perform as dry run of clean operation
:returns: Status Code
"""
with open(
os.path.join(config_data.general.dataDirectory, OUTPUT_MOVIES), "rb"
) as movie_file:
movie_info = collector.deserializeMovieInfo(pickleIO=movie_file)
with open(
os.path.join(config_data.general.dataDirectory, OUTPUT_TV), "rb"
) as tv_file:
tv_info = collector.deserializeTVInfo(pickleIO=tv_file)
movie_rename = process.collectMovieRename(config=config_data, movie_list=movie_info)
tv_rename = process.collectTvRename(config=config_data, tv_info=tv_info)
with alive_bar(len(movie_rename), title="Movie Clean") as progress:
for movie in movie_rename:
config_data.log.info(
"Deleting Movie Directory : {0}".format(movie.src.parentPath)
)
process.performCleanDirectory(
config=config_data, data=movie, dry_run=dry_run
)
progress()
with alive_bar(len(tv_rename), title="TV Clean ") as progress:
for tv in tv_rename:
config_data.log.info(
"Deleting TV Directory : {0}".format(tv.src.parentPath)
)
process.performCleanDirectory(config=config_data, data=tv, dry_run=dry_run)
progress()
return 0
def stats(config_data: config.ConfigSpec) -> int:
"""Perform clean operation
:param config: Configuration object
:returns: Status Code
"""
with open(
os.path.join(config_data.general.dataDirectory, OUTPUT_MOVIES), "rb"
) as movie_file:
movie_info = collector.deserializeMovieInfo(pickleIO=movie_file)
with open(
os.path.join(config_data.general.dataDirectory, OUTPUT_TV), "rb"
) as tv_file:
tv_info = collector.deserializeTVInfo(pickleIO=tv_file)
movie_stats = collector.movieInfoStatistics(data=movie_info)
tv_stats = collector.tvInfoStatistics(data=tv_info)
config_data.log.info(
"%s Movies and %s Episodes over %s Tv Shows",
movie_stats.count,
tv_stats.episodeCount,
tv_stats.count,
)
return 0
[docs]
def main() -> int:
"""Main process"""
config_data = collectConfig(DEFAULT_CONFIG_FILE)
args, config_data = parseArgs(config_data=config_data)
if not os.path.isdir(config_data.general.dataDirectory):
os.makedirs(config_data.general.dataDirectory, exist_ok=True)
if not os.path.isdir(os.path.dirname(config_data.general.logFile)):
if os.path.dirname(config_data.general.logFile) != "":
os.makedirs(os.path.dirname(config_data.general.logFile), exist_ok=True)
logger = configLogger(
level=args.debug, log_file=config_data.general.logFile, log_std=(not args.quiet)
)
config_data.log = logger
status = 0
if args.command == "init":
status = init(args.config)
elif args.command == "collect":
status = collect(config_data=config_data)
elif args.command == "rename":
status = rename(config_data=config_data, dry_run=args.dryRun)
elif args.command == "clean":
status = clean(config_data=config_data, dry_run=args.dryRun)
elif args.command == "stats":
status = stats(config_data=config_data)
return status
if __name__ == "__main__":
sys.exit(main())