import datetime import json import tempfile import webbrowser from collections import OrderedDict import plotly from git import Repo from progress.bar import Bar from radon.cli import Config, MIHarvester, RawHarvester from django.conf import settings from django.core.management.base import BaseCommand from django.utils import timezone class WorseMaintainabilityIndexError(Exception): pass class Command(BaseCommand): help = "Calculate current Maintainability Index" def add_arguments(self, parser): parser.add_argument( '--commit', action='store_true', dest='commit', default=False, help="Commit result.", ) parser.add_argument( '--fail', action='store_true', dest='fail', default=False, help="Raise Error, if MI is worse than before.", ) parser.add_argument( '--showhistory', action='store_true', dest='showhistory', default=False, help="Show history of MI in browser.", ) parser.add_argument( '--init', action='store_true', dest='init', default=False, help="Initialize history for past commits.", ) def handle(self, *args, **options): if options.get('init', False): build_for_past('bump version') return mi_calculator = MaintainabilityIndexCalculater() if options.get('showhistory', False): mi_calculator.show_history() return if options.get('fail', False): mi_calculator.check_for_fail() if options.get('commit', False): mi_calculator.commit_new_mi() return str(mi_calculator.maintainability_index) class MaintainabilityIndexCalculater(object): def __init__(self): self.radon_settings = getattr(settings, 'RADON_MI_SETTINGS', {}) self.history_file_path = self.radon_settings.get( 'history_file_path', 'mi_history.json' ) self.current_state_file_path = self.radon_settings.get( 'current_state_file_path', 'mi_current_state.json' ) self.__init_harvesters() def get_current_history(self) -> dict: """ Return dict of history of MIs by given path to history file. """ mi_history = getattr(self, '_mi_history', False) if mi_history: return mi_history try: with open(self.history_file_path, 'r') as mi_history_file: current_history = json.loads(mi_history_file.read()) except FileNotFoundError: current_history = {} setattr(self, '_mi_history', current_history) return current_history def check_for_fail(self) -> None: """ Raise an Error if new_maintainability_index is worse than recent from history. """ mi_history = self.get_current_history() if not mi_history: # No old data. No worse result possible. return last_history = mi_history[sorted(mi_history.keys())[-1]] last_maintainability_index = last_history['mi'] if self.maintainability_index < last_maintainability_index: raise WorseMaintainabilityIndexError( f'From {last_maintainability_index} ' f'to {self.maintainability_index}!' ) def commit_new_mi(self) -> None: """ Save new maintainability index and current state of all files in a commit. """ repo = Repo.init() index = Repo.init().index current_commit = repo.head.commit mi_history = self.get_current_history() with open(self.history_file_path, 'w') as mi_history_file: mi_history[timezone.now().isoformat()] = { 'sha': current_commit.hexsha, 'mi': self.maintainability_index, 'lloc': self.lloc, 'loc': self.loc, 'sloc': self.sloc, } mi_history_file.write(json.dumps(mi_history)) with open(self.current_state_file_path, 'w') as mi_current_state: mi_current_state.write(self.mi_harvester.as_json()) index.add([self.history_file_path, self.current_state_file_path]) index.commit('chore: Update maintainability index') repo.close() def __init_harvesters(self) -> None: """ Initialize needed harvesters with settings. """ paths = self.radon_settings.get('paths', []) config = Config( min='A', max='C', exclude=self.radon_settings.get('exclude'), ignore=self.radon_settings.get('ignore'), multi=self.radon_settings.get('multi', False), show=True, sort=True, ) raw_harvester = RawHarvester(paths, config) mi_harvester = MIHarvester(paths, config) self.raw_harvester = raw_harvester self.mi_harvester = mi_harvester @property def maintainability_index(self) -> float: """ Return maintainability index of given scope. 1. Count logical lines of code (lloc) on every file. 2. Calculate Maintainability Index on every file. 3. Merge Maintainability Index of every file by rating files higher by their logical lines of code. """ maintainability_index = getattr(self, '_maintainability_index', None) if maintainability_index: return maintainability_index loc_data = self.loc_data lloc_count = 0 global_mi = 0 for file_name, mi_data in self.mi_harvester.results: try: lloc = loc_data[file_name]['lloc'] mi = mi_data['mi'] except KeyError: continue lloc_count += lloc global_mi += mi * lloc maintainability_index = global_mi / lloc_count setattr(self, '_maintainability_index', maintainability_index) return maintainability_index @property def loc_data(self) -> dict: """ Return data collected by RAW Harvester. """ loc_data = getattr(self, '_loc_data', None) if loc_data: return loc_data loc_data = dict(self.raw_harvester.results) setattr(self, '_loc_data', loc_data) return loc_data @property def loc(self) -> int: """ Return lines of code of given scope. """ loc_data = self.loc_data return sum(file_loc_data.get('loc', 0) for file_loc_data in loc_data.values()) @property def lloc(self) -> int: """ Return logical lines of code of given scope. """ loc_data = self.loc_data return sum(file_loc_data.get('lloc', 0) for file_loc_data in loc_data.values()) @property def sloc(self) -> int: """ Return logical lines of code of given scope. """ loc_data = self.loc_data return sum(file_loc_data.get('sloc', 0) for file_loc_data in loc_data.values()) @property def comments(self) -> int: """ Return comment lines of code of given scope. """ loc_data = self.loc_data return sum( file_loc_data.get('comments', 0) for file_loc_data in loc_data.values() ) def show_history(self) -> None: """ Show history of the maintainability index in graph in browser. """ mi_history = self.get_current_history() mi_history = OrderedDict(sorted(mi_history.items(), key=lambda x: x[0])) temporary_file = tempfile.NamedTemporaryFile(prefix='mi_history', suffix='.html') file_name = temporary_file.name temporary_file.close() mi_data = plotly.graph_objs.Scatter( x=list(mi_history.keys()), y=[values['mi'] for values in mi_history.values()], name='MI', ) loc_data = plotly.graph_objs.Scatter( x=list(mi_history.keys()), y=[values['loc'] for values in mi_history.values()], yaxis='y2', name='LOC', ) lloc_data = plotly.graph_objs.Scatter( x=list(mi_history.keys()), y=[values['lloc'] for values in mi_history.values()], yaxis='y2', name='LLOC', ) sloc_data = plotly.graph_objs.Scatter( x=list(mi_history.keys()), y=[values['sloc'] for values in mi_history.values()], yaxis='y2', name='SLOC', ) plotly.offline.plot( { 'data': [mi_data, loc_data, lloc_data, sloc_data], 'layout': plotly.graph_objs.Layout( yaxis={'rangemode': 'tozero'}, yaxis2={ 'side': 'right', 'overlaying': 'y', 'showgrid': False, 'rangemode': 'tozero', }, ), }, auto_open=False, filename=file_name, ) if file_name[0] != '/': # windows ... url = 'file:///' + file_name else: url = 'file://' + file_name webbrowser.open_new(url) def build_for_past(commit_lookup_message: str) -> None: repo = Repo.init() current_branch = repo.active_branch if repo.is_dirty(): raise Exception('Dirty Repo! Stash/Commit unchanged files!') history = {} revisions = [] for commit in repo.iter_commits(current_branch): if commit_lookup_message in commit.message: revisions.append( ( commit.hexsha, # key datetime.datetime.fromtimestamp(commit.committed_date), # date ) ) bar = Bar("Processing", max=len(revisions)) error = None try: for revision in revisions: repo.git.checkout(revision[0]) mic = MaintainabilityIndexCalculater() mi = mic.maintainability_index lloc = mic.lloc loc = mic.loc sloc = mic.sloc history[revision[1].isoformat()] = { 'sha': revision[0], 'mi': mi, 'lloc': lloc, 'loc': loc, 'sloc': sloc, } bar.next() except Exception as e: error = e finally: bar.finish() repo.git.checkout(current_branch) repo.close() radon_settings = getattr(settings, 'RADON_MI_SETTINGS', {}) history_file_path = radon_settings.get('history_file_path', 'mi_history.json') with open(history_file_path, 'w') as f: f.write(json.dumps(history)) if error: raise error