layerindex-web/rrs/models.py
Mariano Lopez 0d295280da models.py: Code cleanup
Cleaned up the class Raw to have better readability.
Also added docstring to methods in this class.

Signed-off-by: Mariano Lopez <mariano.lopez@linux.intel.com>
2015-07-13 15:31:28 -05:00

551 lines
18 KiB
Python

# rrs-web - model definitions
#
# Copyright (C) 2014 Intel Corporation
#
# Licensed under the MIT license, see COPYING.MIT for details
import sys
import os
import os.path
sys.path.insert(0, os.path.realpath(os.path.join(os.path.dirname(__file__), '../')))
from datetime import date
from django.db import models
from django.db import connection
from django.db.models.query import Q
from layerindex.models import Recipe
class Release(models.Model):
name = models.CharField(max_length=100, unique=True)
start_date = models.DateField(db_index=True)
end_date = models.DateField(db_index=True)
@staticmethod
def get_by_date(date):
release_qry = Release.objects.filter(start_date__lte = date,
end_date__gte = date).order_by('-end_date')
if release_qry:
return release_qry[0]
else:
return None
@staticmethod
def get_current():
current = date.today()
current_release = Release.get_by_date(current)
return current_release or Release.objects.filter().order_by('-end_date')[0]
def __unicode__(self):
return '%s' % (self.name)
class Milestone(models.Model):
release = models.ForeignKey(Release)
name = models.CharField(max_length=100)
start_date = models.DateField(db_index=True)
end_date = models.DateField(db_index=True)
class Meta:
unique_together = ('release', 'name',)
""" Get milestones, filtering don't exist yet and ordering """
@staticmethod
def get_by_release_name(release_name):
milestones = []
today = date.today()
mall = Milestone.objects.get(release__name = release_name, name = 'All')
if mall:
milestones.append(mall)
mqry = Milestone.objects.filter(release__name = release_name).order_by('-end_date')
for m in mqry:
if m.name == 'All':
continue
if m.start_date > today:
continue
milestones.append(m)
return milestones
""" Get milestone by release and date """
@staticmethod
def get_by_release_and_date(release, date):
milestone_set = Milestone.objects.filter(release = release,
start_date__lte = date, end_date__gte = date). \
exclude(name = 'All').order_by('-end_date')
if milestone_set:
return milestone_set[0]
else:
return None
""" Get current milestone """
@staticmethod
def get_current(release):
current_milestone = None
current_date = date.today()
mqry = Milestone.objects.filter(release = release, start_date__lte = current_date,
end_date__gte = current_date).exclude(name = 'All').order_by('-end_date')
if mqry:
current_milestone = mqry[0]
else:
current_milestone = Milestone.objects.filter(release = release). \
order_by('-end_date')[0]
return current_milestone
""" Get milestone intervals by release """
@staticmethod
def get_milestone_intervals(release):
milestones = Milestone.objects.filter(release = release)
milestone_dir = {}
for m in milestones:
if "All" in m.name:
continue
milestone_dir[m.name] = {}
milestone_dir[m.name]['start_date'] = m.start_date
milestone_dir[m.name]['end_date'] = m.end_date
return milestone_dir
""" Get week intervals from start and end of Milestone """
def get_week_intervals(self):
from datetime import timedelta
weeks = {}
week_delta = timedelta(weeks=1)
week_no = 1
current_date = self.start_date
while True:
if current_date >= self.end_date:
break;
weeks[week_no] = {}
weeks[week_no]['start_date'] = current_date
weeks[week_no]['end_date'] = current_date + week_delta
current_date += week_delta
week_no += 1
return weeks
def __unicode__(self):
return '%s%s' % (self.release.name, self.name)
class Maintainer(models.Model):
name = models.CharField(max_length=255, unique=True)
email = models.CharField(max_length=255, blank=True)
"""
Create maintainer if no exist else update email.
Return Maintainer.
"""
@staticmethod
def create_or_update(name, email):
try:
m = Maintainer.objects.get(name = name)
m.email = email
except Maintainer.DoesNotExist:
m = Maintainer()
m.name = name
m.email = email
m.save()
return m
class Meta:
ordering = ["name"]
def __unicode__(self):
return "%s <%s>" % (self.name, self.email)
class RecipeMaintainerHistory(models.Model):
title = models.CharField(max_length=255, blank=True)
date = models.DateTimeField(db_index=True)
author = models.ForeignKey(Maintainer)
sha1 = models.CharField(max_length=64, unique=True)
@staticmethod
def get_last():
rmh_qry = RecipeMaintainerHistory.objects.filter().order_by('-date')
if rmh_qry:
return rmh_qry[0]
else:
return None
@staticmethod
def get_by_end_date(end_date):
rmh_qry = RecipeMaintainerHistory.objects.filter(
date__lte = end_date).order_by('-date')
if rmh_qry:
return rmh_qry[0]
rmh_qry = RecipeMaintainerHistory.objects.filter(
).order_by('date')
if rmh_qry:
return rmh_qry[0]
else:
return None
def __unicode__(self):
return "%s: %s, %s" % (self.date, self.author.name, self.sha1[:10])
class RecipeMaintainer(models.Model):
recipe = models.ForeignKey(Recipe)
maintainer = models.ForeignKey(Maintainer)
history = models.ForeignKey(RecipeMaintainerHistory)
@staticmethod
def get_maintainer_by_recipe_and_history(recipe, history):
qry = RecipeMaintainer.objects.filter(recipe = recipe,
history = history)
if qry:
return qry[0].maintainer
else:
return None
def __unicode__(self):
return "%s: %s <%s>" % (self.recipe.pn, self.maintainer.name,
self.maintainer.email)
class RecipeUpstreamHistory(models.Model):
start_date = models.DateTimeField(db_index=True)
end_date = models.DateTimeField(db_index=True)
@staticmethod
def get_last_by_date_range(start, end):
history = RecipeUpstreamHistory.objects.filter(start_date__gte = start,
start_date__lte = end).order_by('-start_date')
if history:
return history[0]
else:
return None
@staticmethod
def get_first_by_date_range(start, end):
history = RecipeUpstreamHistory.objects.filter(start_date__gte = start,
start_date__lte = end).order_by('start_date')
if history:
return history[0]
else:
return None
@staticmethod
def get_last():
history = RecipeUpstreamHistory.objects.filter().order_by('-start_date')
if history:
return history[0]
else:
return None
def __unicode__(self):
return '%s: %s' % (self.id, self.start_date)
class RecipeUpstream(models.Model):
RECIPE_UPSTREAM_STATUS_CHOICES = (
('A', 'All'),
('N', 'Not updated'),
('C', 'Can\'t be updated'),
('Y', 'Up-to-date'),
('D', 'Downgrade'),
('U', 'Unknown'),
)
RECIPE_UPSTREAM_STATUS_CHOICES_DICT = dict(RECIPE_UPSTREAM_STATUS_CHOICES)
RECIPE_UPSTREAM_TYPE_CHOICES = (
('A', 'Automatic'),
('M', 'Manual'),
)
RECIPE_UPSTREAM_TYPE_CHOICES_DICT = dict(RECIPE_UPSTREAM_TYPE_CHOICES)
recipe = models.ForeignKey(Recipe)
history = models.ForeignKey(RecipeUpstreamHistory)
version = models.CharField(max_length=100, blank=True)
type = models.CharField(max_length=1, choices=RECIPE_UPSTREAM_TYPE_CHOICES, blank=True, db_index=True)
status = models.CharField(max_length=1, choices=RECIPE_UPSTREAM_STATUS_CHOICES, blank=True, db_index=True)
no_update_reason = models.CharField(max_length=255, blank=True, db_index=True)
date = models.DateTimeField(db_index=True)
@staticmethod
def get_all_recipes(history):
qry = RecipeUpstream.objects.filter(history = history)
return qry
@staticmethod
def get_recipes_not_updated(history):
qry = RecipeUpstream.objects.filter(history = history, status = 'N',
no_update_reason = '').order_by('pn')
return qry
@staticmethod
def get_recipes_cant_be_updated(history):
qry = RecipeUpstream.objects.filter(history = history, status = 'N') \
.exclude(no_update_reason = '').order_by('pn')
return qry
@staticmethod
def get_recipes_up_to_date(history):
qry = RecipeUpstream.objects.filter(history = history, status = 'Y' \
).order_by('pn')
return qry
@staticmethod
def get_recipes_unknown(history):
qry = RecipeUpstream.objects.filter(history = history,
status__in = ['U', 'D']).order_by('pn')
return qry
@staticmethod
def get_by_recipe_and_history(recipe, history):
ru = RecipeUpstream.objects.filter(recipe = recipe, history = history)
return ru[0] if ru else None
def needs_upgrade(self):
if self.status == 'N':
return True
else:
return False
def __unicode__(self):
return '%s: (%s, %s, %s)' % (self.recipe.pn, self.status,
self.version, self.date)
class RecipeDistro(models.Model):
recipe = models.ForeignKey(Recipe)
distro = models.CharField(max_length=100, blank=True)
alias = models.CharField(max_length=100, blank=True)
def __unicode__(self):
return '%s: %s' % (self.recipe.pn, self.distro)
@staticmethod
def get_distros_by_recipe(recipe):
recipe_distros = []
query = RecipeDistro.objects.filter(recipe = recipe).order_by('distro')
for q in query:
recipe_distros.append(q.distro)
return recipe_distros
class RecipeUpgrade(models.Model):
recipe = models.ForeignKey(Recipe)
maintainer = models.ForeignKey(Maintainer, blank=True)
sha1 = models.CharField(max_length=40, blank=True)
title = models.CharField(max_length=1024, blank=True)
version = models.CharField(max_length=100, blank=True)
author_date = models.DateTimeField(db_index=True)
commit_date = models.DateTimeField(db_index=True)
@staticmethod
def get_by_recipe_and_date(recipe, end_date):
ru = RecipeUpgrade.objects.filter(recipe = recipe,
commit_date__lte = end_date)
return ru[0] if ru else None
def short_sha1(self):
return self.sha1[0:6]
def commit_url(self):
web_interface_url = self.recipe.layerbranch.layer.vcs_web_url
return web_interface_url + "/commit/?id=" + self.sha1
def __unicode__(self):
return '%s: (%s, %s)' % (self.recipe.pn, self.version,
self.commit_date)
class Raw():
""" Raw SQL call to improve performance
Table abbrevations:
re: Recipe
ma: Maintainer
reup: Recipe Upstream
reupg: Recipe Ugrade
rema: Recipe Maintainer
remahi: Recipe Maintainer History
"""
@staticmethod
def get_re_all():
""" Get all Recipes """
cur = connection.cursor()
cur.execute("""SELECT id, pn, pv, summary
FROM layerindex_Recipe;
""")
return Raw.dictfetchall(cur)
@staticmethod
def get_re_by_mantainer_and_date(maintainer, date_id):
""" Get Recipes based on Maintainer and Recipe Maintainer History """
recipes = []
cur = connection.cursor()
cur.execute("""SELECT DISTINCT rema.recipe_id
FROM rrs_RecipeMaintainer AS rema
INNER JOIN rrs_maintainer AS ma
ON rema.maintainer_id = ma.id
WHERE rema.history_id = %s
AND ma.name = %s;
""", [date_id, maintainer])
for re in cur.fetchall():
recipes.append(re[0])
return recipes
@staticmethod
def get_ma_by_recipes_and_date(recipes_id, date_id=None):
""" Get Maintainer based on Recipes and Recipe Upstream History """
stats = []
recipes = str(recipes_id).strip('[]')
if date_id:
qry = """SELECT rema.recipe_id, ma.name
FROM rrs_RecipeMaintainer AS rema
INNER JOIN rrs_Maintainer AS ma
ON rema.maintainer_id = ma.id"""
qry += "\nWHERE rema.history_id = '%s'" % str(date_id)
qry += "\nAND rema.recipe_id IN (%s);" % recipes
cur = connection.cursor()
cur.execute(qry)
stats = Raw.dictfetchall(cur)
return stats
@staticmethod
def get_reup_by_recipes_and_date(recipes_id, date_id=None):
""" Get Recipe Upstream based on Recipes and Recipe Upstream History """
stats = []
recipes = str(recipes_id).strip('[]')
if date_id:
qry = """SELECT recipe_id, status, no_update_reason, version
FROM rrs_RecipeUpstream"""
qry += "\nWHERE history_id = '%s'" % str(date_id)
qry += "\nAND recipe_id IN (%s);" % recipes
cur = connection.cursor()
cur.execute(qry)
stats = Raw.dictfetchall(cur)
return stats
@staticmethod
def get_reup_by_last_updated(date):
""" Get last time the Recipes were upgraded """
cur = connection.cursor()
cur.execute("""SELECT te.recipe_id, te.status, te.date, te.rownum FROM(
SELECT recipe_id, status, date, ROW_NUMBER() OVER(
PARTITION BY recipe_id
ORDER BY date DESC
) AS rownum
FROM rrs_RecipeUpstream
WHERE status = 'Y'
AND date <= %s) AS te
WHERE te.rownum = 1;
""", [date])
return Raw.dictfetchall(cur)
@staticmethod
def get_reup_by_date(date_id):
""" Get Recipes not up to date based on Recipe Upstream History """
cur = connection.cursor()
cur.execute("""SELECT DISTINCT recipe_id
FROM rrs_RecipeUpstream
WHERE status = 'N'
AND history_id = %s
""", [date_id])
return [i[0] for i in cur.fetchall()]
@staticmethod
def get_reupg_by_date(date):
""" Get info for Recipes for the milestone """
cur = connection.cursor()
cur.execute("""SELECT re.id, re.pn, re.summary, te.version, rownum FROM (
SELECT recipe_id, version, commit_date, ROW_NUMBER() OVER(
PARTITION BY recipe_id
ORDER BY commit_date DESC
) AS rownum
FROM rrs_RecipeUpgrade
WHERE commit_date <= %s) AS te
INNER JOIN layerindex_Recipe AS re
ON te.recipe_id = re.id
WHERE rownum = 1
ORDER BY re.pn;
""", [date])
return Raw.dictfetchall(cur)
@staticmethod
def get_reupg_by_dates(start_date, end_date):
""" Get Recipe Upgrade for the milestone """
cur = connection.cursor()
cur.execute("""SELECT id, recipe_id, maintainer_id, author_date, commit_date
FROM rrs_RecipeUpgrade
WHERE commit_date >= %s
AND commit_date <= %s
ORDER BY commit_date DESC;
""", [start_date, end_date])
return Raw.dictfetchall(cur)
@staticmethod
def get_reupg_by_dates_and_recipes(start_date, end_date, recipes_id):
""" Get Recipe Upgrade for the milestone based on Recipes """
recipes = str(recipes_id).strip('[]')
cur = connection.cursor()
qry = """SELECT DISTINCT recipe_id
FROM rrs_RecipeUpgrade"""
qry += "\nWHERE commit_date >= '%s'" % str(start_date)
qry += "\nAND commit_date <= '%s'" % str(end_date)
qry += "\nAND recipe_id IN (%s);" % recipes
cur.execute(qry)
return Raw.dictfetchall(cur)
@staticmethod
def get_remahi_by_end_date(date):
""" Get the latest Recipe Maintainer History for the milestone """
cur = connection.cursor()
cur.execute("""SELECT id
FROM rrs_RecipeMaintainerHistory
WHERE date <= %s
ORDER BY date DESC
LIMIT 1;
""", [str(date)])
ret = cur.fetchone()
if not ret:
cur.execute("""SELECT id
FROM rrs_RecipeMaintainerHistory
ORDER BY date
LIMIT 1;
""")
ret = cur.fetchone()
return ret
@staticmethod
def dictfetchall(cursor):
"Returns all rows from a cursor as a dict"
desc = cursor.description
return [
dict(zip([col[0] for col in desc], row))
for row in cursor.fetchall()
]