add some more tests and increase coverage, fix some bugz.

This commit is contained in:
Hellowlol
2018-06-28 01:19:30 +02:00
parent a0b31cbb52
commit 35b90c3ab0
9 changed files with 147 additions and 66 deletions

View File

@@ -9,6 +9,7 @@ exclude_lines =
raise NotImplementedError
except ImportError
except KeyboardInterrupt
# Don't complain if non-runnable code isn't run:
if 0:

View File

@@ -2,6 +2,7 @@ language: python
python:
- "2.7"
- "3.6"
#- "3.7-dev" # lets wait until 3.7 is out.
before_install:
- sudo add-apt-repository -y ppa:mc3man/trusty-media
- sudo apt-get -y update

View File

@@ -29,7 +29,7 @@ subcommands = ['watch', 'add_theme_to_hashtable', 'check_db', 'export_db',
'set_manual_theme_time', 'test_a_movie']
def trim_argv():
def trim_argv(): # pragma: no cover
"""Remove any sub commands and arguments for subcommands."""
args = sys.argv[:]
for cmd in subcommands:

View File

@@ -1,41 +1,25 @@
import os
import sys
def fake_main():
import bw_plex
default_folder = None
debug = False
config = None
subcommands = ['watch', 'add_theme_to_hashtable', 'check_db', 'export_db',
'ffmpeg_process', 'manually_correct_theme', 'process', 'match',
'set_manual_theme_time', 'test_a_movie']
def trim_argv():
args = sys.argv[:]
for cmd in subcommands:
try:
idx = args.index(cmd)
return args[idx:]
except ValueError:
pass
return []
for i, e in enumerate(trim_argv()):
args = bw_plex.trim_argv()
for i, e in enumerate(args): # pragma: no cover
if e == '--default_folder' or e == '-df':
default_folder = sys.argv[i + 1]
default_folder = args[i + 1]
if e == '--debug' or e == '-d':
debug = True
if e == '--config' or e == '-c':
config = sys.argv[i + 1]
config = args[i + 1]
import bw_plex
bw_plex.init(folder=default_folder, debug=debug, config=config)
from bw_plex.plex import real_main
real_main()

View File

@@ -35,13 +35,13 @@ def ignore_ratingkey(item, key):
if item.TYPE == 'movie':
return item.ratingKey in key
if item.TYPE == 'episode':
return any(i for i in [item.ratingKey, item.grandparentRatingKey, item.parentKey] if i in key)
return any(i for i in [item.ratingKey, item.grandparentRatingKey, item.parentRatingKey] if i in key)
return False
def get_pms(url=None, token=None, username=None,
password=None, servername=None, verify_ssl=None):
password=None, servername=None, verify_ssl=None): # pragma: no cover
url = url or CONFIG['server'].get('url')
token = token or CONFIG['server'].get('token')
@@ -65,7 +65,7 @@ def get_pms(url=None, token=None, username=None,
return PMS
def users_pms(pms, user):
def users_pms(pms, user): # pragma: no cover
"""Login on your server using the users access credentials."""
from plexapi.exceptions import NotFound
LOG.debug('Logging in on PMS as %s', user)
@@ -309,7 +309,7 @@ def get_valid_filename(s):
except (UnicodeError, UnicodeDecodeError, AttributeError):
try:
input_str = unicodedata.normalize('NFKD', input_str)
except:
except: # pragma: no cover
pass
return u''.join([c for c in input_str if not unicodedata.combining(c)])
@@ -353,7 +353,7 @@ def convert_and_trim(afile, fs=8000, trim=None, theme=False, filename=None):
psox = subprocess.Popen(cmd, stderr=subprocess.PIPE)
o, e = psox.communicate()
if not psox.returncode == 0:
if not psox.returncode == 0: # pragma: no cover
LOG.exception(e)
raise Exception("FFMpeg failed")
@@ -513,7 +513,7 @@ def search_for_theme_youtube(name, rk=1337, save_path=None, url=None):
ydl.download([name + ' theme song'])
return t + '.wav'
except:
except: # pragma: no cover
LOG.exception('Failed to download theme song %s' % name)
LOG.debug('Done downloading theme for %s', name)

View File

@@ -28,7 +28,7 @@ SHOWS = {}
HT = None
is_64bit = struct.calcsize('P') * 8
if not is_64bit:
if not is_64bit: # pragma: no cover
LOG.info('You not using a python 64 bit version.')
@@ -40,7 +40,8 @@ def log_exception(func):
return func(*args, **kwargs)
else:
return func(*args)
except:
except: # pragma: no cover
err = "There was an exception in "
err += func.__name__
LOG.exception(err)
@@ -49,7 +50,7 @@ def log_exception(func):
return inner
def find_all_movies_shows(func=None):
def find_all_movies_shows(func=None): # pragma: no cover
""" Helper of get all the shows on a server.
Args:
@@ -196,9 +197,9 @@ def process_to_db(media, theme=None, vid=None, start=None, end=None, ffmpeg_end=
@click.option('--servername', '-s', default=None, help='The server you want to monitor.')
@click.option('--url', default=None, help='url to the server you want to monitor')
@click.option('--token', '-t', default=None, help='plex-x-token')
@click.option('--config', '-c', default=None, help='Not in use atm.')
@click.option('--config', '-c', default=None, help='Path to config file.')
@click.option('--verify_ssl', '-vs', default=None, help='Enable this to allow insecure connections to PMS')
@click.option('--default_folder', '-df', default=None, help='default folder to store shit')
@click.option('--default_folder', '-df', default=None, help='Override for the default folder, typically used by dockers.')
def cli(debug, username, password, servername, url, token, config, verify_ssl, default_folder):
""" Entry point for the CLI."""
global PMS
@@ -432,7 +433,7 @@ def process(name, sample, threads, skip_done):
@click.option('-dv', default=0.5, type=float)
@click.option('-pix_th', default=0.10, type=float)
@click.option('-au_db', default=50, type=int)
def ffmpeg_process(name, trim, dev, da, dv, pix_th, au_db):
def ffmpeg_process(name, trim, dev, da, dv, pix_th, au_db): # pragma: no cover
"""Simple manual test for ffmpeg_process with knobs to turn."""
n = find_offset_ffmpeg(name, trim=trim, dev=dev, duration_audio=da,
@@ -469,7 +470,7 @@ def create_config(fp=None):
@click.option('-rk', help='Add rating key', default='auto')
@click.option('-jt', '--just_theme', default=False, is_flag=True)
@click.option('-rot', '--remove_old_theme', default=False, is_flag=True)
def manually_correct_theme(name, url, type, rk, just_theme, remove_old_theme):
def manually_correct_theme(name, url, type, rk, just_theme, remove_old_theme): # pragma: no cover
"""Set the correct fingerprint of the show in the hashes.db and
process the eps of that show in the db against the new theme fingerprint.
@@ -639,7 +640,7 @@ def check_file_access(m):
@log_exception
def client_action(offset=None, sessionkey=None, action='jump'):
def client_action(offset=None, sessionkey=None, action='jump'): # pragma: no cover
"""Seek the client to the offset.
Args:
@@ -651,7 +652,6 @@ def client_action(offset=None, sessionkey=None, action='jump'):
"""
global JUMP_LIST
LOG.debug('Called client_action with %s %s %s %s', offset, to_time(offset), sessionkey, action)
# LOG.debug('%s', JUMP_LIST)
def proxy_on_fail(func):
import plexapi
@@ -665,8 +665,8 @@ def client_action(offset=None, sessionkey=None, action='jump'):
LOG.debug('Failed to reach the client directly, trying via server.')
correct_client.proxyThroughServer()
func()
except:
correct_client.proxyThroughServer()
except: # pragma: no cover
correct_client.proxyThroughServer(value=False)
raise
if offset == -1:
@@ -722,6 +722,7 @@ def client_action(offset=None, sessionkey=None, action='jump'):
if ignore_ratingkey(media, CONFIG['general'].get('ignore_intro_ratingkeys')):
LOG.debug('Didnt send seek command this show, season or episode is ignored')
return
# PMP seems to be really picky about timeline calls, if we dont
# it returns 406 errors after 90 sec.
if correct_client.product == 'Plex Media Player':
@@ -768,7 +769,7 @@ def task(item, sessionkey):
global HT
media = PMS.fetchItem(int(item))
LOG.debug('Found %s', media._prettyfilename())
if media.TYPE not in ('episode', 'show', 'movie'):
if media.TYPE not in ('episode', 'show', 'movie'): # pragma: no cover
return
if media.TYPE == 'episode':
@@ -781,7 +782,7 @@ def task(item, sessionkey):
try:
os.remove(vid)
LOG.debug('Deleted %s', vid)
except IOError:
except IOError: # pragma: no cover
LOG.exception('Failed to delete %s', vid)
elif media.TYPE == 'movie':
@@ -789,7 +790,7 @@ def task(item, sessionkey):
try:
IN_PROG.remove(item)
except ValueError:
except ValueError: # pragma: no cover
LOG.debug('Failed to remove %s from IN_PROG', item)
nxt = find_next(media)
@@ -813,12 +814,6 @@ def check(data):
progress = sess.get('viewOffset', 0) / 1000 # converted to sec.
mode = CONFIG['general'].get('mode', 'skip_only_theme')
# This has to be removed if/when credits are added.
# Check if its possible to get the duration of the video some way if not we might need to
# get it via PMS.fetchItem(int(ratingkey))
# if progress >= 600:
# return
def best_time(item):
"""Find the best time in the db."""
if item.type == 'episode' and item.correct_theme_end and item.correct_theme_end != 1:
@@ -838,7 +833,7 @@ def check(data):
return sec
def jump(item, sessionkey, sec=None, action=None):
def jump(item, sessionkey, sec=None, action=None): # pragma: no cover
if sec is None:
sec = best_time(item)
@@ -914,8 +909,8 @@ def check(data):
LOG.debug('%s was added to %s', title, PMS.friendlyName)
# Youtubedl can fail if we batch add loads of eps at the same time if there is no
# theme.
if (metadata_type == 1 and CONFIG['movie'].get('process_recently_added') or
metadata_state == 4 and CONFIG['tv'].get('process_recently_added')):
if (metadata_type == 1 and not CONFIG['movie'].get('process_recently_added') or
metadata_state == 4 and not CONFIG['tv'].get('process_recently_added')):
LOG.debug("Didnt start to process %s is process_recently_added is disabled")
return
@@ -928,8 +923,8 @@ def check(data):
elif (metadata_type in (1, 4) and state == 9 and
metadata_state == 'deleted'):
if (metadata_type == 1 and CONFIG['movie'].get('process_deleted') or
metadata_state == 4 and CONFIG['tv'].get('process_deleted')):
if (metadata_type == 1 and not CONFIG['movie'].get('process_deleted') or
metadata_state == 4 and not CONFIG['tv'].get('process_deleted')):
LOG.debug("Didnt start to process %s is process_deleted is disabled for")
return
@@ -944,7 +939,7 @@ def check(data):
@cli.command()
@click.argument('-f', type=click.Path(exists=True))
def match(f):
def match(f): # pragma: no cover
"""Manual match for a file. This is useful for testing we finds the correct start and
end time."""
global HT
@@ -954,7 +949,7 @@ def match(f):
@cli.command()
def watch():
def watch(): # # pragma: no cover
"""Start watching the server for stuff to do."""
global HT
HT = get_hashtable()
@@ -972,7 +967,7 @@ def watch():
@cli.command()
@click.argument('name')
def test_a_movie(name):
def test_a_movie(name): # pragma: no cover
result = PMS.search(name)
if result:

View File

@@ -5,7 +5,7 @@ import tempfile
from datetime import datetime as DT
from plexapi.video import Episode, Show
from plexapi.video import Episode, Show, Movie
# from plexapi.compat import makedirs
from sqlalchemy.orm.exc import NoResultFound
import pytest
@@ -66,6 +66,7 @@ def episode(mocker):
ep.title = ''
ep.grandparentTitle = 'Dexter'
ep.ratingKey = 1337
ep.parentRatingKey = 1337
ep._server = ''
ep.title = 'Dexter'
ep.index = 1
@@ -86,6 +87,28 @@ def episode(mocker):
return ep
@pytest.fixture()
def film(mocker):
ep = mocker.MagicMock(spec=Movie)
ep.TYPE = 'movie'
ep.name = 'Random'
ep.title = 'Random'
ep.ratingKey = 7331
ep._server = ''
ep.duration = 60 * 60 * 1000 # 1h in ms
ep.updatedAt = DT(1970, 1, 1)
def _prettyfilename():
return 'Random'
def iterParts():
yield os.path.join(TEST_DATA, 'dexter_s03e01_intro.mkv')
ep._prettyfilename = _prettyfilename
return ep
@pytest.fixture()
def media(mocker, episode):
media = mocker.Mock(spec=Show)

View File

@@ -5,8 +5,9 @@ from conftest import plex
import click
def test_cli():
pass
def test_cli(cli_runner):
res = cli_runner.invoke(plex.cli, ['--help'])
click.echo(res.output)
def test_create_config(monkeypatch, cli_runner, tmpdir):
@@ -17,11 +18,18 @@ def test_create_config(monkeypatch, cli_runner, tmpdir):
assert os.path.exists(fullpath)
def test_check(episode, intro_file, cli_runner, tmpdir, monkeypatch, HT, mocker):
def fetchItem(i):
def test_check(episode, film, intro_file, cli_runner, tmpdir, monkeypatch, HT, mocker):
def fetchItem_ep(i):
return episode
def fetchItem_film(i):
return film
mf = mocker.Mock()
mf.fetchItem = fetchItem_film
m = mocker.Mock()
m.fetchItem = fetchItem
m.fetchItem = fetchItem_ep
def zomg(*args, **kwargs):
pass
@@ -30,11 +38,14 @@ def test_check(episode, intro_file, cli_runner, tmpdir, monkeypatch, HT, mocker)
monkeypatch.setattr(plex, 'HT', HT)
monkeypatch.setattr(plex, 'PMS', m)
monkeypatch.setitem(plex.CONFIG['tv'], 'check_credits', True)
monkeypatch.setitem(plex.CONFIG['tv'], 'process_deleted', True)
# monkeypatch.setitem(plex.CONFIG['movie'], 'process_deleted', True)
monkeypatch.setattr(plex, 'check_file_access', lambda k: intro_file)
monkeypatch.setattr(plex, 'find_next', lambda k: None)
monkeypatch.setattr(plex, 'client_action', zomg)
# tv
data = {"PlaySessionStateNotification": [{"guid": "",
"key": "/library/metadata/1337",
"playQueueItemID": 22631,
@@ -72,6 +83,41 @@ def test_check(episode, intro_file, cli_runner, tmpdir, monkeypatch, HT, mocker)
assert json.load(f)
item_deleted = {"type": "timeline",
"size": 1,
"TimelineEntry": [{"identifier": "com.plexapp.plugins.library",
"sectionID": 2,
"itemID": 1337,
"type": 4,
"title": "Dexter S01 E01",
"state": 9,
"mediaState": "deleted",
"queueSize": 8,
"updatedAt": 1526744644}]
}
r = plex.check(item_deleted)
if r:
r.get()
monkeypatch.setattr(plex, 'PMS', mf)
data_movie = {"PlaySessionStateNotification": [{"guid": "",
"key": "/library/metadata/7331",
"playQueueItemID": 22631,
"ratingKey": "7331",
"sessionKey": "84",
"state": "playing",
"transcodeSession": "4avh8p7h64n4e9a16xsqvr9e",
"url": "",
"viewOffset": 1000}],
"size": 1,
"type": "playing"
}
r = plex.check(data_movie)
if r:
r.get()
def _test_process_to_db(episode, intro_file, cli_runner, tmpdir, monkeypatch, HT, mocker):
# This is tested in check
@@ -102,20 +148,26 @@ def _test_process_to_db(episode, intro_file, cli_runner, tmpdir, monkeypatch, HT
assert json.load(f)
def test_process(cli_runner, monkeypatch, episode, media, HT, intro_file, mocker):
def test_process(cli_runner, monkeypatch, episode, film, media, HT, intro_file, mocker):
# Let the mock begin..
mocker.patch.object(plex, 'find_all_movies_shows', side_effect=[[media], [episode]])
mocker.patch('click.prompt', side_effect=['0', '0'])
def fetchItem(i):
def fetchItem_ep(i):
return episode
def fetchItem_m(i):
return film
m = mocker.Mock()
m.fetchItem = fetchItem
m.fetchItem = fetchItem_ep
def zomg(*args, **kwargs):
pass
mf = mocker.Mock()
mf.fetchItem = fetchItem_m
monkeypatch.setattr(plex, 'PMS', m)
monkeypatch.setitem(plex.CONFIG['tv'], 'theme_source', 'tvtunes')
monkeypatch.setattr(plex, 'check_file_access', lambda k: intro_file)
@@ -123,9 +175,12 @@ def test_process(cli_runner, monkeypatch, episode, media, HT, intro_file, mocker
monkeypatch.setattr(plex, 'find_next', lambda k: None)
res = cli_runner.invoke(plex.process, ['-n', 'dexter', '-s', '1', '-t', '2', '-sd'])
res = cli_runner.invoke(plex.process, ['-n', 'dexter', '-s', '1', '-t', '2'])
#print(res.output)
monkeypatch.setattr(plex, 'PMS', mf)
res = cli_runner.invoke(plex.process, ['-n', 'Random', '-s', '1', '-sd'])
def test_add_theme_to_hashtable(cli_runner, monkeypatch, HT):
# We just want to check that this doesnt blow up..

View File

@@ -15,6 +15,24 @@ def test_get_valid_filename():
assert misc.get_valid_filename('M*A*S*H') == 'MASH'
def test_ignoreratingkey(film, episode):
assert misc.ignore_ratingkey(episode, [1337])
assert misc.ignore_ratingkey(film, [7331])
assert not misc.ignore_ratingkey(episode, [113])
def test_sec_to_hh_mm_ss():
x = misc.sec_to_hh_mm_ss(60)
assert x == '00:01:00'
def test_findnxt(film, episode):
assert not misc.find_next(film)
# this should fail was there is no more
# episodes.
assert not misc.find_next(episode)
@pytest.mark.xfail
def test_find_offset_ffmpeg(intro_file):
x = misc.find_offset_ffmpeg(intro_file)
@@ -84,6 +102,10 @@ def test_choose(monkeypatch, mocker):
assert some[0].title == 1
assert some[1].title == 7
with mocker.patch('click.prompt', side_effect=['1000', '-1:']):
some = misc.choose('select', l, 'title')
assert some[0].title == 9
def test_to_time():
assert misc.to_time(-1) == '00:00'