Refactor totals.py

- add main
- move algorithm to function
- use __future__ module
- introduce new style formatting
- restructure test_totals.t accordingly
- add test for empty range
- add tests for colored output
This commit is contained in:
Thomas Lauf 2018-05-10 19:22:34 +02:00
parent d5e2fd8f62
commit b5acfed98f
2 changed files with 388 additions and 169 deletions

View file

@ -25,124 +25,135 @@
#
###############################################################################
from __future__ import print_function
import sys
import json
import datetime
from dateutil import tz
DATEFORMAT = "%Y%m%dT%H%M%SZ"
def format_seconds(seconds):
"""Convert seconds to a formatted string
Convert seconds: 3661
To formatted: 1:01:01
To formatted: " 1:01:01"
"""
hours = int(seconds / 3600)
minutes = int(seconds % 3600) / 60
minutes = int(seconds % 3600 / 60)
seconds = seconds % 60
return "%4d:%02d:%02d" % (hours, minutes, seconds)
return "{:4d}:{:02d}:{:02d}".format(hours, minutes, seconds)
DATEFORMAT = "%Y%m%dT%H%M%SZ"
def calculate_totals(input_stream):
from_zone = tz.tzutc()
to_zone = tz.tzlocal()
# Extract the configuration settings.
header = 1
configuration = dict()
body = ""
for line in sys.stdin:
if header:
if line == "\n":
header = 0
else:
fields = line.strip().split(": ", 2)
if len(fields) == 2:
configuration[fields[0]] = fields[1]
# Extract the configuration settings.
header = 1
configuration = dict()
body = ""
for line in input_stream:
if header:
if line == "\n":
header = 0
else:
configuration[fields[0]] = ""
else:
body += line
# Sum the second tracked by tag.
totals = dict()
untagged = None
from_zone = tz.tzutc()
to_zone = tz.tzlocal()
j = json.loads(body)
for object in j:
start = datetime.datetime.strptime(object["start"], DATEFORMAT)
if "end" in object:
end = datetime.datetime.strptime(object["end"], DATEFORMAT)
else:
end = datetime.datetime.utcnow()
tracked = end - start
if "tags" not in object or object["tags"] == []:
if untagged is None:
untagged = tracked
fields = line.strip().split(": ", 2)
if len(fields) == 2:
configuration[fields[0]] = fields[1]
else:
configuration[fields[0]] = ""
else:
untagged += tracked
else:
for tag in object["tags"]:
if tag in totals:
totals[tag] += tracked
body += line
# Sum the seconds tracked by tag.
totals = dict()
untagged = None
j = json.loads(body)
for object in j:
start = datetime.datetime.strptime(object["start"], DATEFORMAT)
if "end" in object:
end = datetime.datetime.strptime(object["end"], DATEFORMAT)
else:
end = datetime.datetime.utcnow()
tracked = end - start
if "tags" not in object or object["tags"] == []:
if untagged is None:
untagged = tracked
else:
totals[tag] = tracked
untagged += tracked
else:
for tag in object["tags"]:
if tag in totals:
totals[tag] += tracked
else:
totals[tag] = tracked
# Determine largest tag width.
max_width = len("Total")
for tag in totals:
if len(tag) > max_width:
max_width = len(tag)
# Determine largest tag width.
max_width = len("Total")
for tag in totals:
if len(tag) > max_width:
max_width = len(tag)
if "temp.report.start" not in configuration:
print("There is no data in the database")
exit()
if "temp.report.start" not in configuration:
return ["There is no data in the database"]
start_utc = datetime.datetime.strptime(configuration["temp.report.start"], DATEFORMAT)
start_utc = start_utc.replace(tzinfo=from_zone)
start = start_utc.astimezone(to_zone)
start_utc = datetime.datetime.strptime(configuration["temp.report.start"], DATEFORMAT)
start_utc = start_utc.replace(tzinfo=from_zone)
start = start_utc.astimezone(to_zone)
if "temp.report.end" in configuration:
end_utc = datetime.datetime.strptime(configuration["temp.report.end"], DATEFORMAT)
end_utc = end_utc.replace(tzinfo=from_zone)
end = end_utc.astimezone(to_zone)
else:
end = datetime.datetime.now()
if "temp.report.end" in configuration:
end_utc = datetime.datetime.strptime(configuration["temp.report.end"], DATEFORMAT)
end_utc = end_utc.replace(tzinfo=from_zone)
end = end_utc.astimezone(to_zone)
else:
end = datetime.datetime.now()
if len(totals) == 0 and untagged is None:
return ["No data in the range {:%Y-%m-%d %H:%M:%S} - {:%Y-%m-%d %H:%M:%S}".format(start, end)]
if max_width > 0:
# Compose report header.
print("\nTotal by Tag, for {:%Y-%m-%d %H:%M:%S} - {:%Y-%m-%d %H:%M:%S}\n".format(start, end))
output = [
"",
"Total by Tag, for {:%Y-%m-%d %H:%M:%S} - {:%Y-%m-%d %H:%M:%S}".format(start, end),
""
]
# Compose table header.
if configuration["color"] == "on":
print("{:{width}} {:>10}".format("Tag", "Total", width=max_width))
output.append("{:{width}} {:>10}".format("Tag", "Total", width=max_width))
else:
print("{:{width}} {:>10}".format("Tag", "Total", width=max_width))
print("{} {}".format("-" * max_width, "----------"))
output.append("{:{width}} {:>10}".format("Tag", "Total", width=max_width))
output.append("{} {}".format("-" * max_width, "----------"))
# Compose table rows.
grand_total = 0
for tag in sorted(totals):
formatted = format_seconds(totals[tag].seconds)
grand_total += totals[tag].seconds
print("%-*s %10s" % (max_width, tag, formatted))
output.append("{:{width}} {:10}".format(tag, formatted, width=max_width))
if untagged is not None:
formatted = format_seconds(untagged.seconds)
grand_total += untagged.seconds
print("%-*s %10s" % (max_width, "", formatted))
output.append("{:{width}} {:10}".format("", formatted, width=max_width))
# Compose total.
if configuration["color"] == "on":
print("{} {}".format(" " * max_width, " "))
output.append("{} {}".format(" " * max_width, " "))
else:
print("{} {}".format(" " * max_width, "----------"))
output.append("{} {}".format(" " * max_width, "----------"))
print("%-*s %10s" % (max_width, "Total", format_seconds(grand_total)))
output.append("{:{width}} {:10}".format("Total", format_seconds(grand_total), width=max_width))
output.append("")
else:
print("No data in the range {:%Y-%m-%d %H:%M:%S} - {:%Y-%m-%d %H:%M:%S}".format(start, end))
return output
if __name__ == "__main__":
for line in calculate_totals(sys.stdin):
print(line)

View file

@ -27,7 +27,6 @@
###############################################################################
import os
import subprocess
import sys
import unittest
@ -35,32 +34,27 @@ import datetime
# Ensure python finds the local simpletap module
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'ext'))
from basetest import TestCase
from totals import *
class TestTotals(TestCase):
def setUp(self):
current_dir = os.path.dirname(os.path.abspath(__file__))
self.process = subprocess.Popen([os.path.join(current_dir, '../ext/totals.py')],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
def test_totals_with_empty_database(self):
"""totals extension should report error on empty database"""
out, err = self.process.communicate(input="""\
color: off
debug: off
temp.report.start:
temp.report.end:
input_stream = [
'color: off\n',
'debug: on\n',
'temp.report.start: \n',
'temp.report.end: \n',
'\n',
'[]',
]
[
]
""")
out = calculate_totals(input_stream)
self.assertEqual('There is no data in the database\n', out)
self.assertEqual('', err)
self.assertEqual(['There is no data in the database'], out)
def test_totals_with_filled_database(self):
"""totals extension should print report for filled database"""
@ -70,27 +64,51 @@ temp.report.end:
now_utc = now.utcnow()
one_hour_before_utc = now_utc - datetime.timedelta(hours=1)
out, err = self.process.communicate(input="""\
color: off
debug: off
temp.report.start: {0:%Y%m%dT%H%M%S}Z
temp.report.end: {1:%Y%m%dT%H%M%S}Z
input_stream = [
'color: off\n',
'debug: on\n',
'temp.report.start: {:%Y%m%dT%H%M%S}Z\n'.format(one_hour_before_utc),
'temp.report.end: {:%Y%m%dT%H%M%S}Z\n'.format(now_utc),
'\n',
'[{{"start":"{:%Y%m%dT%H%M%S}Z","end":"{:%Y%m%dT%H%M%S}Z","tags":["foo"]}}]'.format(one_hour_before_utc, now_utc)
]
[
{{"start":"{0:%Y%m%dT%H%M%S}Z","end":"{1:%Y%m%dT%H%M%S}Z","tags":["foo"]}}
]
""".format(one_hour_before_utc, now_utc))
out = calculate_totals(input_stream)
self.assertRegexpMatches(out, """
Total by Tag, for {0:%Y-%m-%d %H:%M}:\d{{2}} - {1:%Y-%m-%d %H:%M}:\d{{2}}
self.assertEqual(
[
'',
'Total by Tag, for {:%Y-%m-%d %H:%M:%S} - {:%Y-%m-%d %H:%M:%S}'.format(one_hour_before, now),
'',
'Tag Total',
'----- ----------',
'foo 1:00:00',
' ----------',
'Total 1:00:00',
'',
],
out)
Tag Total
----- ----------
foo 1:00:0[01]
----------
Total 1:00:0[01]
""".format(one_hour_before, now))
self.assertEqual('', err)
def test_totals_with_emtpy_range(self):
"""totals extension should report error on emtpy range"""
now = datetime.datetime.now()
one_hour_before = now - datetime.timedelta(hours=1)
now_utc = now.utcnow()
one_hour_before_utc = now_utc - datetime.timedelta(hours=1)
input_stream = [
'color: off\n',
'debug: on\n',
'temp.report.start: {:%Y%m%dT%H%M%S}Z\n'.format(one_hour_before_utc),
'temp.report.end: {:%Y%m%dT%H%M%S}Z\n'.format(now_utc),
'\n',
'[]',
]
out = calculate_totals(input_stream)
self.assertEqual(['No data in the range {:%Y-%m-%d %H:%M:%S} - {:%Y-%m-%d %H:%M:%S}'.format(one_hour_before, now)], out)
def test_totals_with_interval_without_tags(self):
"""totals extension should handle interval without tags"""
@ -100,27 +118,30 @@ Total 1:00:0[01]
now_utc = now.utcnow()
one_hour_before_utc = now_utc - datetime.timedelta(hours=1)
out, err = self.process.communicate(input="""\
color: off
debug: off
temp.report.start: {0:%Y%m%dT%H%M%S}Z
temp.report.end: {1:%Y%m%dT%H%M%S}Z
input_stream = [
'color: off\n',
'debug: on\n',
'temp.report.start: {:%Y%m%dT%H%M%S}Z\n'.format(one_hour_before_utc),
'temp.report.end: {:%Y%m%dT%H%M%S}Z\n'.format(now_utc),
'\n',
'[{{"start":"{:%Y%m%dT%H%M%S}Z","end":"{:%Y%m%dT%H%M%S}Z"}}]'.format(one_hour_before_utc, now_utc)
]
[
{{"start":"{0:%Y%m%dT%H%M%S}Z","end":"{1:%Y%m%dT%H%M%S}Z"}}
]
""".format(one_hour_before_utc, now_utc))
out = calculate_totals(input_stream)
self.assertRegexpMatches(out, """
Total by Tag, for {0:%Y-%m-%d %H:%M}:\d{{2}} - {1:%Y-%m-%d %H:%M}:\d{{2}}
Tag Total
----- ----------
1:00:0[01]
----------
Total 1:00:0[01]
""".format(one_hour_before, now))
self.assertEqual('', err)
self.assertEqual(
[
'',
'Total by Tag, for {:%Y-%m-%d %H:%M:%S} - {:%Y-%m-%d %H:%M:%S}'.format(one_hour_before, now),
'',
'Tag Total',
'----- ----------',
' 1:00:00',
' ----------',
'Total 1:00:00',
'',
],
out)
def test_totals_with_interval_with_empty_tag_list(self):
"""totals extension should handle interval with empty tag list"""
@ -130,27 +151,30 @@ Total 1:00:0[01]
now_utc = now.utcnow()
one_hour_before_utc = now_utc - datetime.timedelta(hours=1)
out, err = self.process.communicate(input="""\
color: off
debug: off
temp.report.start: {0:%Y%m%dT%H%M%S}Z
temp.report.end: {1:%Y%m%dT%H%M%S}Z
input_stream = [
'color: off\n',
'debug: on\n',
'temp.report.start: {:%Y%m%dT%H%M%S}Z\n'.format(one_hour_before_utc),
'temp.report.end: {:%Y%m%dT%H%M%S}Z\n'.format(now_utc),
'\n',
'[{{"start":"{:%Y%m%dT%H%M%S}Z","end":"{:%Y%m%dT%H%M%S}Z","tags":[]}}]'.format(one_hour_before_utc, now_utc)
]
[
{{"start":"{0:%Y%m%dT%H%M%S}Z","end":"{1:%Y%m%dT%H%M%S}Z", "tags":[]}}
]
""".format(one_hour_before_utc, now_utc))
out = calculate_totals(input_stream)
self.assertRegexpMatches(out, """
Total by Tag, for {0:%Y-%m-%d %H:%M}:\d{{2}} - {1:%Y-%m-%d %H:%M}:\d{{2}}
Tag Total
----- ----------
1:00:0[01]
----------
Total 1:00:0[01]
""".format(one_hour_before, now))
self.assertEqual('', err)
self.assertEqual(
[
'',
'Total by Tag, for {:%Y-%m-%d %H:%M:%S} - {:%Y-%m-%d %H:%M:%S}'.format(one_hour_before, now),
'',
'Tag Total',
'----- ----------',
' 1:00:00',
' ----------',
'Total 1:00:00',
'',
],
out)
def test_totals_with_open_interval(self):
"""totals extension should handle open interval"""
@ -160,31 +184,215 @@ Total 1:00:0[01]
now_utc = now.utcnow()
one_hour_before_utc = now_utc - datetime.timedelta(hours=1)
out, err = self.process.communicate(input="""\
color: off
debug: off
temp.report.start: {0:%Y%m%dT%H%M%S}Z
temp.report.end:
input_stream = [
'color: off\n',
'debug: off\n',
'temp.report.start: {:%Y%m%dT%H%M%S}Z\n'.format(one_hour_before_utc),
'temp.report.end: \n',
'\n',
'[{{"start":"{:%Y%m%dT%H%M%S}Z","tags":["foo"]}}]'.format(one_hour_before_utc),
]
[
{{"start":"{0:%Y%m%dT%H%M%S}Z","tags":["foo"]}}
]
""".format(one_hour_before_utc))
out = calculate_totals(input_stream)
self.assertRegexpMatches(out, """
Total by Tag, for {0:%Y-%m-%d %H:%M}:\d{{2}} - {1:%Y-%m-%d %H:%M}:\d{{2}}
self.assertEqual(
[
'',
'Total by Tag, for {:%Y-%m-%d %H:%M:%S} - {:%Y-%m-%d %H:%M:%S}'.format(one_hour_before, now),
'',
'Tag Total',
'----- ----------',
'foo 1:00:00',
' ----------',
'Total 1:00:00',
'',
],
out)
Tag Total
----- ----------
foo 1:00:0[01]
----------
Total 1:00:0[01]
""".format(one_hour_before, now))
self.assertEqual('', err)
def test_totals_colored_with_empty_database(self):
"""totals extension should report error on empty database (colored)"""
input_stream = [
'color: on\n',
'debug: on\n',
'temp.report.start: \n',
'temp.report.end: \n',
'\n',
'[]',
]
out = calculate_totals(input_stream)
self.assertEqual(['There is no data in the database'], out)
def test_totals_colored_with_filled_database(self):
"""totals extension should print report for filled database (colored)"""
now = datetime.datetime.now()
one_hour_before = now - datetime.timedelta(hours=1)
now_utc = now.utcnow()
one_hour_before_utc = now_utc - datetime.timedelta(hours=1)
input_stream = [
'color: on\n',
'debug: on\n',
'temp.report.start: {:%Y%m%dT%H%M%S}Z\n'.format(one_hour_before_utc),
'temp.report.end: {:%Y%m%dT%H%M%S}Z\n'.format(now_utc),
'\n',
'[{{"start":"{:%Y%m%dT%H%M%S}Z","end":"{:%Y%m%dT%H%M%S}Z","tags":["foo"]}}]'.format(one_hour_before_utc, now_utc)
]
out = calculate_totals(input_stream)
self.assertEqual(
[
'',
'Total by Tag, for {:%Y-%m-%d %H:%M:%S} - {:%Y-%m-%d %H:%M:%S}'.format(one_hour_before, now),
'',
'Tag   Total',
'foo 1:00:00',
'  ',
'Total 1:00:00',
'',
],
out)
def test_totals_colored_with_emtpy_range(self):
"""totals extension should report error on emtpy range (colored)"""
now = datetime.datetime.now()
one_hour_before = now - datetime.timedelta(hours=1)
now_utc = now.utcnow()
one_hour_before_utc = now_utc - datetime.timedelta(hours=1)
input_stream = [
'color: on\n',
'debug: on\n',
'temp.report.start: {:%Y%m%dT%H%M%S}Z\n'.format(one_hour_before_utc),
'temp.report.end: {:%Y%m%dT%H%M%S}Z\n'.format(now_utc),
'\n',
'[]',
]
out = calculate_totals(input_stream)
self.assertEqual(['No data in the range {:%Y-%m-%d %H:%M:%S} - {:%Y-%m-%d %H:%M:%S}'.format(one_hour_before, now)], out)
def test_totals_colored_with_interval_without_tags(self):
"""totals extension should handle interval without tags (colored)"""
now = datetime.datetime.now()
one_hour_before = now - datetime.timedelta(hours=1)
now_utc = now.utcnow()
one_hour_before_utc = now_utc - datetime.timedelta(hours=1)
input_stream = [
'color: on\n',
'debug: on\n',
'temp.report.start: {:%Y%m%dT%H%M%S}Z\n'.format(one_hour_before_utc),
'temp.report.end: {:%Y%m%dT%H%M%S}Z\n'.format(now_utc),
'\n',
'[{{"start":"{:%Y%m%dT%H%M%S}Z","end":"{:%Y%m%dT%H%M%S}Z"}}]'.format(one_hour_before_utc, now_utc)
]
out = calculate_totals(input_stream)
self.assertEqual(
[
'',
'Total by Tag, for {:%Y-%m-%d %H:%M:%S} - {:%Y-%m-%d %H:%M:%S}'.format(one_hour_before, now),
'',
'Tag   Total',
' 1:00:00',
'  ',
'Total 1:00:00',
'',
],
out)
def test_totals_colored_with_interval_with_empty_tag_list(self):
"""totals extension should handle interval with empty tag list (colored)"""
now = datetime.datetime.now()
one_hour_before = now - datetime.timedelta(hours=1)
now_utc = now.utcnow()
one_hour_before_utc = now_utc - datetime.timedelta(hours=1)
input_stream = [
'color: on\n',
'debug: on\n',
'temp.report.start: {:%Y%m%dT%H%M%S}Z\n'.format(one_hour_before_utc),
'temp.report.end: {:%Y%m%dT%H%M%S}Z\n'.format(now_utc),
'\n',
'[{"start":"20160101T070000Z","end":"20160101T080000Z","tags":[]}]',
]
out = calculate_totals(input_stream)
self.assertEqual(
[
'',
'Total by Tag, for {:%Y-%m-%d %H:%M:%S} - {:%Y-%m-%d %H:%M:%S}'.format(one_hour_before, now),
'',
'Tag   Total',
' 1:00:00',
'  ',
'Total 1:00:00',
'',
],
out)
def test_totals_colored_with_open_interval(self):
"""totals extension should handle open interval (colored)"""
now = datetime.datetime.now()
one_hour_before = now - datetime.timedelta(hours=1)
now_utc = now.utcnow()
one_hour_before_utc = now_utc - datetime.timedelta(hours=1)
input_stream = [
'color: on\n',
'debug: off\n',
'temp.report.start: {:%Y%m%dT%H%M%S}Z\n'.format(one_hour_before_utc),
'temp.report.end: \n',
'\n',
'[{{"start":"{:%Y%m%dT%H%M%S}Z","tags":["foo"]}}]'.format(one_hour_before_utc),
]
out = calculate_totals(input_stream)
self.assertEqual(
[
'',
'Total by Tag, for {:%Y-%m-%d %H:%M:%S} - {:%Y-%m-%d %H:%M:%S}'.format(one_hour_before, now),
'',
'Tag   Total',
'foo 1:00:00',
'  ',
'Total 1:00:00',
'',
],
out)
def test_format_seconds_with_less_than_1_minute(self):
"""Test format_seconds with less than 1 minute"""
self.assertEqual(format_seconds(34), ' 0:00:34')
def test_format_seconds_with_1_minute(self):
"""Test format_seconds with 1 minute"""
self.assertEqual(format_seconds(60), ' 0:01:00')
def test_format_seconds_with_1_hour(self):
"""Test format_seconds with 1 hour"""
self.assertEqual(format_seconds(3600), ' 1:00:00')
def test_format_seconds_with_more_than_1_hour(self):
"""Test format_seconds with more than 1 hour"""
self.assertEqual(format_seconds(3645), ' 1:00:45')
if __name__ == "__main__":
if __name__ == '__main__':
from simpletap import TAPTestRunner
unittest.main(testRunner=TAPTestRunner())
# vim: ai sts=4 et sw=4 ft=python