[Date Prev][Date Next] [Thread Prev][Thread Next] [Date Index] [Thread Index]

Bug#762934: Models: Refactor models to contain only ORM abstraction



Since my last patch there was some discussion on how to better design this.
In the end instead of just moving the queries out of the models it was
decided to have 3 files:

1) models.py for the ORM layer
2) navigation.py for all the classes not in the ORM layer
3) queries.py for all the queries.

Hence this patch implements this approach and refactors the two views.


From 0af68510211e854ac1de432cd181314266dddbc2 Mon Sep 17 00:00:00 2001
From: Orestis Ioannou <orestis@oioannou.com>
Date: Mon, 16 Mar 2015 22:51:26 +0100
Subject: [PATCH] web app: refactor models and views to match new abstraction
 layer

Add navigation.py for the non orm models
Add query.py for all the sqlalchemy queries
Add test suite for the queries
Refactor views to match new abstraction layer
---
 debsources/app/sources/views.py  |  33 ++--
 debsources/app/views.py          | 104 +++--------
 debsources/models.py             | 385 +--------------------------------------
 debsources/navigation.py         | 209 +++++++++++++++++++++
 debsources/query.py              | 311 +++++++++++++++++++++++++++++++
 debsources/tests/test_queries.py |  70 +++++++
 6 files changed, 634 insertions(+), 478 deletions(-)
 create mode 100644 debsources/navigation.py
 create mode 100644 debsources/query.py
 create mode 100644 debsources/tests/test_queries.py

diff --git a/debsources/app/sources/views.py b/debsources/app/sources/views.py
index 59e85f3..6a956d8 100644
--- a/debsources/app/sources/views.py
+++ b/debsources/app/sources/views.py
@@ -1,7 +1,6 @@
 import os
 
 from flask import current_app, request, jsonify, url_for
-from sqlalchemy import func as sql_func
 from debian.debian_support import version_compare
 
 from debsources.excepts import (
@@ -9,10 +8,12 @@ from debsources.excepts import (
     InvalidPackageOrVersionError)
 from debsources.consts import SLOCCOUNT_LANGUAGES
 from debsources import statistics
-from debsources.models import (
-    PackageName, SourceFile, Checksum, Directory, Location, SuiteInfo,
-    SuiteAlias)
 
+from debsources.models import SuiteAlias
+from debsources.navigation import (Location, Directory,
+                                   SourceFile)
+
+import debsources.query as qry
 from ..views import GeneralView, app, session
 from ..extract_stats import extract_stats
 from ..infobox import Infobox
@@ -30,7 +31,7 @@ class StatsView(GeneralView):
                                   "stats.data")
         res = extract_stats(filename=stats_file,
                             filter_suites=["debian_" + suite])
-        info = session.query(SuiteInfo).filter(SuiteInfo.name == suite).first()
+        info = qry.get_suite_info(session, suite)
 
         return dict(results=res,
                     languages=SLOCCOUNT_LANGUAGES,
@@ -69,7 +70,7 @@ class SourceView(GeneralView):
             suite = ""
         # we list the version with suites it belongs to
         try:
-            versions_w_suites = PackageName.list_versions_w_suites(
+            versions_w_suites = qry.list_versions_w_suites(
                 session, packagename, suite)
         except InvalidPackageOrVersionError:
             raise Http404Error("%s not found" % packagename)
@@ -80,7 +81,7 @@ class SourceView(GeneralView):
             self.render_func = bind_render(
                 'sources/source_package.html',
                 # we simply add pathl (for use with "You are here:")
-                pathl=Location.get_path_links('.source', path_to))
+                pathl=qry.get_path_links('.source', path_to))
 
         return dict(type="package",
                     package=packagename,
@@ -155,7 +156,7 @@ class SourceView(GeneralView):
                 subdirs=filter(lambda x: x['type'] == "directory", content),
                 subfiles=filter(lambda x: x['type'] == "file", content),
                 nb_hidden_files=sum(1 for f in content if f['hidden']),
-                pathl=Location.get_path_links(".source", path),)
+                pathl=qry.get_path_links(".source", path),)
 
         return dict(type="directory",
                     directory=location.get_deepest_element(),
@@ -171,9 +172,9 @@ class SourceView(GeneralView):
         """
         file_ = SourceFile(location)
         checksum = file_.get_sha256sum(session)
-        number_of_duplicates = (session.query(sql_func.count(Checksum.id))
-                                .filter(Checksum.sha256 == checksum)
-                                .first()[0])
+        number_of_duplicates = (qry.count_files_checksum(session, checksum)
+                                .first()[0]
+                                )
         pkg_infos = Infobox(session,
                             location.get_package(),
                             location.get_version()).get_infos()
@@ -190,7 +191,7 @@ class SourceView(GeneralView):
                         raw_url=raw_url,
                         path=path,
                         text_file=text_file,
-                        stat=location.get_stat(location.sources_path),
+                        stat=qry.get_stat(location.sources_path),
                         checksum=checksum,
                         number_of_duplicates=number_of_duplicates,
                         pkg_infos=pkg_infos
@@ -237,7 +238,7 @@ class SourceView(GeneralView):
             self.render_func = bind_render(
                 self.d['templatename'],
                 nlines=sourcefile.get_number_of_lines(),
-                pathl=Location.get_path_links(".source", path),
+                pathl=qry.get_path_links(".source", path),
                 file_language=sourcefile.get_file_language(),
                 msgs=sourcefile.get_msgdict(),
                 code=sourcefile)
@@ -249,7 +250,7 @@ class SourceView(GeneralView):
                     raw_url=raw_url,
                     path=path,
                     text_file=text_file,
-                    stat=location.get_stat(location.sources_path),
+                    stat=qry.get_stat(location.sources_path),
                     checksum=checksum,
                     number_of_duplicates=number_of_duplicates,
                     pkg_infos=pkg_infos
@@ -271,7 +272,7 @@ class SourceView(GeneralView):
         when 'latest' is provided instead of a version number
         """
         try:
-            versions = PackageName.list_versions(session, package)
+            versions = qry.list_versions(session, package)
         except InvalidPackageOrVersionError:
             raise Http404Error("%s not found" % package)
         # the latest version is the latest item in the
@@ -315,7 +316,7 @@ class SourceView(GeneralView):
                 if check_for_alias:
                     version = check_for_alias.suite
                 try:
-                    versions_w_suites = PackageName.list_versions_w_suites(
+                    versions_w_suites = qry.list_versions_w_suites(
                         session, package)
                 except InvalidPackageOrVersionError:
                     raise Http404Error("%s not found" % package)
diff --git a/debsources/app/views.py b/debsources/app/views.py
index ba27494..bb47adf 100644
--- a/debsources/app/views.py
+++ b/debsources/app/views.py
@@ -23,15 +23,14 @@ from flask import (
     current_app, jsonify, render_template, request, url_for, redirect)
 from flask.views import View
 
-from sqlalchemy import func as sql_func
-
 from debsources.excepts import (
     Http500Error, Http404Error, Http404ErrorSuggestions, Http403Error)
-from debsources.models import (
-    Ctag, Package, PackageName, Checksum, File, Suite)
+from debsources.models import Package
+import debsources.query as qry
 from debsources.sqla_session import _close_session
 from debsources import local_info
 from debsources.consts import SUITES
+
 from .forms import SearchForm
 from .pagination import Pagination
 from .infobox import Infobox
@@ -70,7 +69,7 @@ def skeleton_variables():
     # TODO, this part should be moved to per blueprint context processor
     last_update = local_info.read_update_ts(update_ts_file)
 
-    packages_prefixes = PackageName.get_packages_prefixes(
+    packages_prefixes = qry.get_packages_prefixes(
         app.config["CACHE_DIR"])
 
     credits_file = os.path.join(app.config["LOCAL_DIR"], "credits.html")
@@ -120,7 +119,7 @@ class ErrorHandler(object):
             if isinstance(error, Http404ErrorSuggestions):
                 # let's suggest all the possible locations with a different
                 # package version
-                possible_versions = PackageName.list_versions(
+                possible_versions = qry.list_versions(
                     session, error.package)
                 suggestions = ['/'.join(
                     filter(None, [error.package, v.version, error.path]))
@@ -265,35 +264,17 @@ class SearchView(GeneralView):
         # if suite is not specified
         if not suite:
             try:
-                exact_matching = (session.query(PackageName)
-                                  .filter_by(name=query)
-                                  .first())
-
-                other_results = (session.query(PackageName)
-                                 .filter(sql_func.lower(PackageName.name)
-                                         .contains(query.lower()))
-                                 .order_by(PackageName.name)
-                                 )
+                exact_matching = qry.get_pkg_by_name(session, query)
+
+                other_results = qry.get_pkg_by_similar_name(session, query)
             except Exception as e:
                 raise Http500Error(e)  # db problem, ...
         else:
             try:
-                exact_matching = (session.query(PackageName)
-                                  .filter(sql_func.lower(Suite.suite)
-                                          == suite)
-                                  .filter(Suite.package_id == Package.id)
-                                  .filter(Package.name_id == PackageName.id)
-                                  .filter(PackageName.name == query)
-                                  .first())
-
-                other_results = (session.query(PackageName)
-                                 .filter(sql_func.lower(Suite.suite)
-                                         == suite)
-                                 .filter(Suite.package_id == Package.id)
-                                 .filter(Package.name_id == PackageName.id)
-                                 .filter(sql_func.lower(PackageName.name)
-                                         .contains(query.lower()))
-                                 .order_by(PackageName.name))
+                exact_matching = qry.get_pkg_by_name(session, query, suite)
+
+                other_results = qry.get_pkg_by_similar_name(session,
+                                                            query, suite)
             except Exception as e:
                 raise Http500Error(e)  # db problem, ...
 
@@ -334,19 +315,7 @@ class ChecksumView(GeneralView):
         Returns a list of files whose hexdigest is checksum.
         You can slice the results, passing slice=(start, end).
         """
-        results = (session.query(PackageName.name.label("package"),
-                                 Package.version.label("version"),
-                                 Checksum.file_id.label("file_id"),
-                                 File.path.label("path"))
-                   .filter(Checksum.sha256 == checksum)
-                   .filter(Checksum.package_id == Package.id)
-                   .filter(Checksum.file_id == File.id)
-                   .filter(Package.name_id == PackageName.id)
-                   )
-        if package is not None and package != "":
-            results = results.filter(PackageName.name == package)
-
-        results = results.order_by("package", "version", "path")
+        results = qry.get_files_by_checksum(session, checksum, package)
 
         if slice_ is not None:
             results = results.slice(slice_[0], slice_[1])
@@ -369,12 +338,7 @@ class ChecksumView(GeneralView):
         package = request.args.get("package") or None
 
         # we count the number of results:
-        count = (session.query(sql_func.count(Checksum.id))
-                 .filter(Checksum.sha256 == checksum))
-        if package is not None and package != "":  # (only within the package)
-            count = (count.filter(PackageName.name == package)
-                     .filter(Checksum.package_id == Package.id)
-                     .filter(Package.name_id == PackageName.id))
+        count = qry.count_files_checksum(session, checksum, package)
         count = count.first()[0]
 
         # pagination:
@@ -425,8 +389,9 @@ class CtagView(GeneralView):
         else:
             pagination = None
             slice_ = None
-        count, results = Ctag.find_ctag(session, ctag, slice_=slice_,
-                                        package=package)
+
+        (count, results) = qry.find_ctag(session, ctag, slice_=slice_,
+                                         package=package)
         if self.d.get('pagination'):
             pagination = Pagination(page, offset, count)
         else:
@@ -452,27 +417,16 @@ class PrefixView(GeneralView):
         suite = suite.lower()
         if suite == "all":
             suite = ""
-        if prefix in PackageName.get_packages_prefixes(
+        if prefix in qry.get_packages_prefixes(
                 app.config["CACHE_DIR"]):
             try:
                 if not suite:
-                    packages = (session.query(PackageName)
-                                .filter(sql_func.lower(PackageName.name)
-                                        .startswith(prefix))
-                                .order_by(PackageName.name)
-                                .all()
-                                )
+                    packages = qry.get_pkg_filter_prefix(session,
+                                                         prefix).all()
                 else:
-                    packages = (session.query(PackageName)
-                                .filter(sql_func.lower(Suite.suite)
-                                        == suite)
-                                .filter(Suite.package_id == Package.id)
-                                .filter(Package.name_id == PackageName.id)
-                                .filter(sql_func.lower(PackageName.name)
-                                        .startswith(prefix))
-                                .order_by(PackageName.name)
-                                .all()
-                                )
+                    packages = qry.get_pkg_filter_prefix(session,
+                                                         prefix,
+                                                         suite).all()
 
                 packages = [p.to_dict() for p in packages]
             except Exception as e:
@@ -488,10 +442,7 @@ class ListPackagesView(GeneralView):
     def get_objects(self, page=1):
         if not self.d.get('pagination'):  # api form, we retrieve all packages
             try:
-                packages = (session.query(PackageName)
-                            .order_by(PackageName.name)
-                            .all()
-                            )
+                packages = qry.get_all_packages(session).all()
                 packages = [p.to_dict() for p in packages]
                 return dict(packages=packages)
             except Exception as e:
@@ -505,11 +456,8 @@ class ListPackagesView(GeneralView):
                 start = (page - 1) * offset
                 end = start + offset
 
-                count_packages = (session.query(PackageName)
-                                  .count()
-                                  )
-                packages = (session.query(PackageName)
-                            .order_by(PackageName.name)
+                count_packages = qry.count_packages(session)
+                packages = (qry.get_all_packages(session)
                             .slice(start, end)
                             )
                 pagination = Pagination(page, offset, count_packages)
diff --git a/debsources/models.py b/debsources/models.py
index c47094c..fb3b952 100644
--- a/debsources/models.py
+++ b/debsources/models.py
@@ -16,31 +16,17 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
-import os
-import magic
-import stat
-import fnmatch
-from collections import namedtuple
-
 from sqlalchemy import Column, ForeignKey
 from sqlalchemy import UniqueConstraint, PrimaryKeyConstraint
 from sqlalchemy import Index
 from sqlalchemy import Boolean, Date, DateTime, Integer, LargeBinary, String
 from sqlalchemy import Enum
-from sqlalchemy import and_
-from sqlalchemy import func as sql_func
 from sqlalchemy.orm import relationship
 from sqlalchemy.ext.declarative import declarative_base
 
-from debian.debian_support import version_compare
 
-from debsources.excepts import InvalidPackageOrVersionError, \
-    FileOrFolderNotFound
 from debsources.consts import VCS_TYPES, SLOCCOUNT_LANGUAGES, \
-    CTAGS_LANGUAGES, METRIC_TYPES, AREAS, PREFIXES_DEFAULT
-from debsources import filetype
-from debsources.debmirror import SourcePackage
-from debsources.consts import SUITES
+    CTAGS_LANGUAGES, METRIC_TYPES
 
 Base = declarative_base()
 
@@ -65,77 +51,6 @@ class PackageName(Base):
     def __repr__(self):
         return self.name
 
-    @staticmethod
-    def get_packages_prefixes(cache_dir):
-        """
-        returns the packages prefixes (a, b, ..., liba, libb, ..., y, z)
-        cache_dir: the cache directory, usually comes from the app config
-        """
-        try:
-            with open(os.path.join(cache_dir, 'pkg-prefixes')) as f:
-                prefixes = [l.rstrip() for l in f]
-        except IOError:
-            prefixes = PREFIXES_DEFAULT
-        return prefixes
-
-    @staticmethod
-    def list_versions(session, packagename, suite=""):
-        """
-        return all versions of a packagename. if suite is specified, only
-        versions contained in that suite are returned.
-        """
-        try:
-            name_id = session.query(PackageName) \
-                             .filter(PackageName.name == packagename) \
-                             .first().id
-        except Exception:
-            raise InvalidPackageOrVersionError(packagename)
-        try:
-            if not suite:
-                versions = session.query(Package) \
-                                  .filter(Package.name_id == name_id).all()
-            else:
-                versions = (session.query(Package)
-                                   .filter(Package.name_id == name_id)
-                                   .filter(sql_func.lower(Suite.suite)
-                                           == suite)
-                                   .filter(Suite.package_id == Package.id)
-                                   .all())
-        except Exception:
-            raise InvalidPackageOrVersionError(packagename)
-        # we sort the versions according to debian versions rules
-        versions = sorted(versions, cmp=version_compare)
-        return versions
-
-    @staticmethod
-    def list_versions_w_suites(session, packagename, suite=""):
-        """
-        return versions with suites. if suite is provided, then only return
-        versions contained in that suite.
-        """
-        # FIXME a left outer join on (Package, Suite) is more preferred.
-        # However, per https://stackoverflow.com/a/997467, custom aggregation
-        # function to concatenate the suite names for the group_by should be
-        # defined on database connection level.
-        versions = PackageName.list_versions(session, packagename, suite)
-        versions_w_suites = []
-        try:
-            for v in versions:
-                suites = session.query(Suite) \
-                                .filter(Suite.package_id == v.id) \
-                                .all()
-                # sort the suites according to debsources.consts.SUITES
-                # use keyfunc to make it py3 compatible
-                suites.sort(key=lambda s: SUITES['all'].index(s.suite))
-                suites = [s.suite for s in suites]
-                v = v.to_dict()
-                v['suites'] = suites
-                versions_w_suites.append(v)
-        except Exception:
-            raise InvalidPackageOrVersionError(packagename)
-
-        return versions_w_suites
-
     def to_dict(self):
         """
         simply serializes a package (because SQLAlchemy query results
@@ -364,41 +279,6 @@ class Ctag(Base):
     #                .filter(Ctag.tag in ctags)
     #                .filter(Ctag
 
-    @staticmethod
-    def find_ctag(session, ctag, package=None, slice_=None):
-        """
-        Returns places in the code where a ctag is found.
-             tuple (count, [sliced] results)
-
-        session: an SQLAlchemy session
-        ctag: the ctag to search
-        package: limit results to package
-        """
-
-        results = (session.query(PackageName.name.label("package"),
-                                 Package.version.label("version"),
-                                 Ctag.file_id.label("file_id"),
-                                 File.path.label("path"),
-                                 Ctag.line.label("line"))
-                   .filter(Ctag.tag == ctag)
-                   .filter(Ctag.package_id == Package.id)
-                   .filter(Ctag.file_id == File.id)
-                   .filter(Package.name_id == PackageName.id)
-                   )
-        if package is not None:
-            results = results.filter(PackageName.name == package)
-
-        results = results.order_by(Ctag.package_id, File.path)
-        count = results.count()
-        if slice_ is not None:
-            results = results.slice(slice_[0], slice_[1])
-        results = [dict(package=res.package,
-                        version=res.version,
-                        path=res.path,
-                        line=res.line)
-                   for res in results.all()]
-        return (count, results)
-
 
 class Metric(Base):
     __tablename__ = 'metrics'
@@ -490,266 +370,3 @@ class HistorySlocCount(Base):
     def __init__(self, suite, timestamp):
         self.suite = suite
         self.timestamp = timestamp
-
-# it's used in Location.get_stat
-# to bypass flake8 complaints, we do not inject the global namespace
-# with globals()["LongFMT"] = namedtuple...
-LongFMT = namedtuple("LongFMT", ["type", "perms", "size", "symlink_dest"])
-
-
-class Location(object):
-    """ a location in a package, can be a directory or a file """
-
-    def _get_debian_path(self, session, package, version, sources_dir):
-        """
-        Returns the Debian path of a package version.
-        For example: main/h
-                     contrib/libz
-        It's the path of a *version*, since a package can have multiple
-        versions in multiple areas (ie main/contrib/nonfree).
-
-        sources_dir: the sources directory, usually comes from the app config
-        """
-        prefix = SourcePackage.pkg_prefix(package)
-
-        try:
-            p_id = session.query(PackageName) \
-                          .filter(PackageName.name == package).first().id
-            varea = session.query(Package) \
-                           .filter(and_(Package.name_id == p_id,
-                                        Package.version == version)) \
-                           .first().area
-        except:
-            # the package or version doesn't exist in the database
-            # BUT: packages are stored for a longer time in the filesystem
-            # to allow codesearch.d.n and others less up-to-date platforms
-            # to point here.
-            # Problem: we don't know the area of such a package
-            # so we try in main, contrib and non-free.
-            for area in AREAS:
-                if os.path.exists(os.path.join(sources_dir, area,
-                                               prefix, package, version)):
-                    return os.path.join(area, prefix)
-
-            raise InvalidPackageOrVersionError("%s %s" % (package, version))
-
-        return os.path.join(varea, prefix)
-
-    def __init__(self, session, sources_dir, sources_static,
-                 package, version="", path=""):
-        """ initialises useful attributes """
-        debian_path = self._get_debian_path(session,
-                                            package, version, sources_dir)
-        self.package = package
-        self.version = version
-        self.path = path
-        self.path_to = os.path.join(package, version, path)
-
-        self.sources_path = os.path.join(
-            sources_dir,
-            debian_path,
-            self.path_to)
-
-        self.version_path = os.path.join(
-            sources_dir,
-            debian_path,
-            package,
-            version)
-
-        if not(os.path.exists(self.sources_path)):
-            raise FileOrFolderNotFound("%s" % (self.path_to))
-
-        self.sources_path_static = os.path.join(
-            sources_static,
-            debian_path,
-            self.path_to)
-
-    def is_dir(self):
-        """ True if self is a directory, False if it's not """
-        return os.path.isdir(self.sources_path)
-
-    def is_file(self):
-        """ True if sels is a file, False if it's not """
-        return os.path.isfile(self.sources_path)
-
-    def is_symlink(self):
-        """ True if a folder/file is a symbolic link file, False if it's not
-        """
-        return os.path.islink(self.sources_path)
-
-    def get_package(self):
-        return self.package
-
-    def get_version(self):
-        return self.version
-
-    def get_path(self):
-        return self.path
-
-    def get_deepest_element(self):
-        if self.version == "":
-            return self.package
-        elif self.path == "":
-            return self.version
-        else:
-            return self.path.split("/")[-1]
-
-    def get_path_to(self):
-        return self.path_to.rstrip("/")
-
-    @staticmethod
-    def get_stat(sources_path):
-        """
-        Returns the filetype and permissions of the folder/file
-        on the disk, unix-styled.
-        """
-        # When porting to Python3, use stat.filemode directly
-        sources_stat = os.lstat(sources_path)
-        sources_mode, sources_size = sources_stat.st_mode, sources_stat.st_size
-        perm_flags = [
-            (stat.S_IRUSR, "r", "-"),
-            (stat.S_IWUSR, "w", "-"),
-            (stat.S_IXUSR, "x", "-"),
-            (stat.S_IRGRP, "r", "-"),
-            (stat.S_IWGRP, "w", "-"),
-            (stat.S_IXGRP, "x", "-"),
-            (stat.S_IROTH, "r", "-"),
-            (stat.S_IWOTH, "w", "-"),
-            (stat.S_IXOTH, "x", "-"),
-            ]
-        # XXX these flags should be enough.
-        type_flags = [
-            (stat.S_ISLNK, "l"),
-            (stat.S_ISREG, "-"),
-            (stat.S_ISDIR, "d"),
-            ]
-        # add the file type: d/l/-
-        file_type = " "
-        for ft, sign in type_flags:
-            if ft(sources_mode):
-                file_type = sign
-                break
-        file_perms = ""
-        for (flag, do_true, do_false) in perm_flags:
-            file_perms += do_true if (sources_mode & flag) else do_false
-
-        file_size = sources_size
-
-        symlink_dest = None
-        if file_type == "l":
-            symlink_dest = os.readlink(sources_path)
-
-        return vars(LongFMT(file_type, file_perms, file_size, symlink_dest))
-
-    @staticmethod
-    def get_path_links(endpoint, path_to):
-        """
-        returns the path hierarchy with urls, to use with 'You are here:'
-        [(name, url(name)), (...), ...]
-        """
-        path_dict = path_to.split('/')
-        pathl = []
-
-        # we import flask here, in order to permit the use of this module
-        # without requiring the user to have flask (e.g. bin/debsources-update
-        # can run in another machine without flask, because it doesn't use
-        # this method)
-        from flask import url_for
-
-        for (i, p) in enumerate(path_dict):
-            pathl.append((p, url_for(endpoint,
-                                     path_to='/'.join(path_dict[:i+1]))))
-        return pathl
-
-
-class Directory(object):
-    """ a folder in a package """
-
-    def __init__(self, location, hidden_files=[]):
-        self.sources_path = location.sources_path
-        self.location = location
-        self.hidden_files = hidden_files
-
-    def get_listing(self):
-        """
-        returns the list of folders/files in a directory,
-        along with their type (directory/file)
-        in a tuple (name, type)
-        """
-        def get_type(f):
-            if os.path.isdir(os.path.join(self.sources_path, f)):
-                return "directory"
-            else:
-                return "file"
-        get_stat, join_path = self.location.get_stat, os.path.join
-        listing = sorted(dict(name=f, type=get_type(f), hidden=False,
-                              stat=get_stat(join_path(self.sources_path, f)))
-                         for f in os.listdir(self.sources_path))
-
-        for hidden_file in self.hidden_files:
-            for f in listing:
-                full_path = os.path.join(self.location.sources_path, f['name'])
-                if f['type'] == "directory":
-                    full_path += "/"
-                f['hidden'] = (f['hidden']
-                               or fnmatch.fnmatch(full_path, hidden_file))
-
-        return listing
-
-
-class SourceFile(object):
-    """ a source file in a package """
-
-    def __init__(self, location):
-        self.location = location
-        self.sources_path = location.sources_path
-        self.sources_path_static = location.sources_path_static
-        self.mime = self._find_mime()
-
-    def _find_mime(self):
-        """ returns the mime encoding and type of a file """
-        mime = magic.open(magic.MIME_TYPE)
-        mime.load()
-        type_ = mime.file(self.sources_path)
-        mime.close()
-        mime = magic.open(magic.MIME_ENCODING)
-        mime.load()
-        encoding = mime.file(self.sources_path)
-        mime.close()
-        return dict(encoding=encoding, type=type_)
-
-    def get_mime(self):
-        return self.mime
-
-    def get_sha256sum(self, session):
-        """
-        Queries the DB and returns the shasum of the file.
-        """
-        shasum = session.query(Checksum.sha256) \
-                        .filter(Checksum.package_id == Package.id) \
-                        .filter(Package.name_id == PackageName.id) \
-                        .filter(File.id == Checksum.file_id) \
-                        .filter(PackageName.name == self.location.package) \
-                        .filter(Package.version == self.location.version) \
-                        .filter(File.path == str(self.location.path)) \
-                        .first()
-        # WARNING: in the DB path is binary, and here
-        # location.path is unicode, because the path comes from
-        # the URL. TODO: check with non-unicode paths
-        if shasum:
-            shasum = shasum[0]
-        return shasum
-
-    def istextfile(self):
-        """True if self is a text file, False if it's not.
-
-        """
-        return filetype.is_text_file(self.mime['type'])
-        # for substring in text_file_mimes:
-        #     if substring in self.mime['type']:
-        #         return True
-        # return False
-
-    def get_raw_url(self):
-        """ return the raw url on disk (e.g. data/main/a/azerty/foo.bar) """
-        return self.sources_path_static
diff --git a/debsources/navigation.py b/debsources/navigation.py
new file mode 100644
index 0000000..76d20eb
--- /dev/null
+++ b/debsources/navigation.py
@@ -0,0 +1,209 @@
+import os
+import magic
+import fnmatch
+
+from sqlalchemy import and_
+
+from debsources.models import (Checksum, File,
+                               Package, PackageName)
+from debsources import filetype
+from debsources.consts import AREAS
+from debsources.debmirror import SourcePackage
+from debsources.excepts import FileOrFolderNotFound, \
+    InvalidPackageOrVersionError
+import debsources.query as qry
+
+
+class Location(object):
+    """ a location in a package, can be a directory or a file """
+
+    def _get_debian_path(self, session, package, version, sources_dir):
+        """
+        Returns the Debian path of a package version.
+        For example: main/h
+                     contrib/libz
+        It's the path of a *version*, since a package can have multiple
+        versions in multiple areas (ie main/contrib/nonfree).
+
+        sources_dir: the sources directory, usually comes from the app config
+        """
+        prefix = SourcePackage.pkg_prefix(package)
+
+        try:
+            p_id = session.query(PackageName) \
+                          .filter(PackageName.name == package).first().id
+            varea = session.query(Package) \
+                           .filter(and_(Package.name_id == p_id,
+                                        Package.version == version)) \
+                           .first().area
+        except:
+            # the package or version doesn't exist in the database
+            # BUT: packages are stored for a longer time in the filesystem
+            # to allow codesearch.d.n and others less up-to-date platforms
+            # to point here.
+            # Problem: we don't know the area of such a package
+            # so we try in main, contrib and non-free.
+            for area in AREAS:
+                if os.path.exists(os.path.join(sources_dir, area,
+                                               prefix, package, version)):
+                    return os.path.join(area, prefix)
+
+            raise InvalidPackageOrVersionError("%s %s" % (package, version))
+
+        return os.path.join(varea, prefix)
+
+    def __init__(self, session, sources_dir, sources_static,
+                 package, version="", path=""):
+        """ initialises useful attributes """
+        debian_path = self._get_debian_path(session,
+                                            package, version, sources_dir)
+        self.package = package
+        self.version = version
+        self.path = path
+        self.path_to = os.path.join(package, version, path)
+
+        self.sources_path = os.path.join(
+            sources_dir,
+            debian_path,
+            self.path_to)
+
+        self.version_path = os.path.join(
+            sources_dir,
+            debian_path,
+            package,
+            version)
+
+        if not(os.path.exists(self.sources_path)):
+            raise FileOrFolderNotFound("%s" % (self.path_to))
+
+        self.sources_path_static = os.path.join(
+            sources_static,
+            debian_path,
+            self.path_to)
+
+    def is_dir(self):
+        """ True if self is a directory, False if it's not """
+        return os.path.isdir(self.sources_path)
+
+    def is_file(self):
+        """ True if sels is a file, False if it's not """
+        return os.path.isfile(self.sources_path)
+
+    def is_symlink(self):
+        """ True if a folder/file is a symbolic link file, False if it's not
+        """
+        return os.path.islink(self.sources_path)
+
+    def get_package(self):
+        return self.package
+
+    def get_version(self):
+        return self.version
+
+    def get_path(self):
+        return self.path
+
+    def get_deepest_element(self):
+        if self.version == "":
+            return self.package
+        elif self.path == "":
+            return self.version
+        else:
+            return self.path.split("/")[-1]
+
+    def get_path_to(self):
+        return self.path_to.rstrip("/")
+
+
+class Directory(object):
+    """ a folder in a package """
+
+    def __init__(self, location, hidden_files=[]):
+        # if the directory is a toplevel one, we remove the .pc folder
+        self.sources_path = location.sources_path
+        self.location = location
+        self.hidden_files = hidden_files
+
+    def get_listing(self):
+        """
+        returns the list of folders/files in a directory,
+        along with their type (directory/file)
+        in a tuple (name, type)
+        """
+        def get_type(f):
+            if os.path.isdir(os.path.join(self.sources_path, f)):
+                return "directory"
+            else:
+                return "file"
+        get_stat, join_path = qry.get_stat, os.path.join
+        listing = sorted(dict(name=f, type=get_type(f), hidden=False,
+                              stat=get_stat(join_path(self.sources_path, f)))
+                         for f in os.listdir(self.sources_path))
+
+        for hidden_file in self.hidden_files:
+            for f in listing:
+                full_path = os.path.join(self.location.sources_path, f['name'])
+                if f['type'] == "directory":
+                    full_path += "/"
+                f['hidden'] = (f['hidden']
+                               or fnmatch.fnmatch(full_path, hidden_file))
+
+        return listing
+
+
+class SourceFile(object):
+    """ a source file in a package """
+
+    def __init__(self, location):
+        self.location = location
+        self.sources_path = location.sources_path
+        self.sources_path_static = location.sources_path_static
+        self.mime = self._find_mime()
+
+    def _find_mime(self):
+        """ returns the mime encoding and type of a file """
+        mime = magic.open(magic.MIME_TYPE)
+        mime.load()
+        type_ = mime.file(self.sources_path)
+        mime.close()
+        mime = magic.open(magic.MIME_ENCODING)
+        mime.load()
+        encoding = mime.file(self.sources_path)
+        mime.close()
+        return dict(encoding=encoding, type=type_)
+
+    def get_mime(self):
+        return self.mime
+
+    def get_sha256sum(self, session):
+        """
+        Queries the DB and returns the shasum of the file.
+        """
+        shasum = session.query(Checksum.sha256) \
+                        .filter(Checksum.package_id == Package.id) \
+                        .filter(Package.name_id == PackageName.id) \
+                        .filter(File.id == Checksum.file_id) \
+                        .filter(PackageName.name == self.location.package) \
+                        .filter(Package.version == self.location.version) \
+                        .filter(File.path == str(self.location.path)) \
+                        .first()
+        # WARNING: in the DB path is binary, and here
+        # location.path is unicode, because the path comes from
+        # the URL. TODO: check with non-unicode paths
+        if shasum:
+            shasum = shasum[0]
+        return shasum
+
+    def istextfile(self):
+        """True if self is a text file, False if it's not.
+
+        """
+        return filetype.is_text_file(self.mime['type'])
+        # for substring in text_file_mimes:
+        #     if substring in self.mime['type']:
+        #         return True
+        # return False
+
+    def get_raw_url(self):
+        """ return the raw url on disk (e.g. data/main/a/azerty/foo.bar) """
+        return self.sources_path_static
diff --git a/debsources/query.py b/debsources/query.py
new file mode 100644
index 0000000..96eeaaa
--- /dev/null
+++ b/debsources/query.py
@@ -0,0 +1,311 @@
+import os
+import stat
+
+from sqlalchemy import and_
+from sqlalchemy import func as sql_func
+from collections import namedtuple
+
+from debian.debian_support import version_compare
+from debsources.consts import PREFIXES_DEFAULT
+from debsources.consts import SUITES
+from debsources.excepts import InvalidPackageOrVersionError
+from debsources.models import (
+    Checksum, Ctag, File, Package, PackageName, Suite, SuiteInfo)
+
+
+LongFMT = namedtuple("LongFMT", ["type", "perms", "size", "symlink_dest"])
+
+''' ORM queries '''
+
+
+def get_packages_prefixes(cache_dir):
+    """
+    returns the packages prefixes (a, b, ..., liba, libb, ..., y, z)
+    cache_dir: the cache directory, usually comes from the app config
+    """
+    try:
+        with open(os.path.join(cache_dir, 'pkg-prefixes')) as f:
+            prefixes = [l.rstrip() for l in f]
+    except IOError:
+        prefixes = PREFIXES_DEFAULT
+    return prefixes
+
+
+def list_versions(session, packagename, suite=""):
+    """
+    return all versions of a packagename. if suite is specified, only
+    versions contained in that suite are returned.
+    """
+    try:
+        name_id = session.query(PackageName) \
+                         .filter(PackageName.name == packagename) \
+                         .first().id
+    except Exception:
+        raise InvalidPackageOrVersionError(packagename)
+    try:
+        if not suite:
+            versions = session.query(Package) \
+                              .filter(Package.name_id == name_id).all()
+        else:
+            versions = (session.query(Package)
+                               .filter(Package.name_id == name_id)
+                               .filter(sql_func.lower(Suite.suite)
+                                       == suite)
+                               .filter(Suite.package_id == Package.id)
+                               .all())
+    except Exception:
+        raise InvalidPackageOrVersionError(packagename)
+    # we sort the versions according to debian versions rules
+    versions = sorted(versions, cmp=version_compare)
+    return versions
+
+
+def list_versions_w_suites(session, packagename, suite=""):
+    """
+    return versions with suites. if suite is provided, then only return
+    versions contained in that suite.
+    """
+    # FIXME a left outer join on (Package, Suite) is more preferred.
+    # However, per https://stackoverflow.com/a/997467, custom aggregation
+    # function to concatenate the suite names for the group_by should be
+    # defined on database connection level.
+    versions = list_versions(session, packagename, suite)
+    versions_w_suites = []
+    try:
+        for v in versions:
+            suites = session.query(Suite) \
+                            .filter(Suite.package_id == v.id) \
+                            .all()
+            # sort the suites according to debsources.consts.SUITES
+            # use keyfunc to make it py3 compatible
+            suites.sort(key=lambda s: SUITES['all'].index(s.suite))
+            suites = [s.suite for s in suites]
+            v = v.to_dict()
+            v['suites'] = suites
+            versions_w_suites.append(v)
+    except Exception:
+        raise InvalidPackageOrVersionError(packagename)
+    return versions_w_suites
+
+
+def find_ctag(session, ctag, package=None, slice_=None):
+    """
+    Returns places in the code where a ctag is found.
+         tuple (count, [sliced] results)
+
+    session: an SQLAlchemy session
+    ctag: the ctag to search
+    package: limit results to package
+    """
+
+    results = (session.query(PackageName.name.label("package"),
+                             Package.version.label("version"),
+                             Ctag.file_id.label("file_id"),
+                             File.path.label("path"),
+                             Ctag.line.label("line"))
+               .filter(Ctag.tag == ctag)
+               .filter(Ctag.package_id == Package.id)
+               .filter(Ctag.file_id == File.id)
+               .filter(Package.name_id == PackageName.id)
+               )
+    if package is not None:
+        results = results.filter(PackageName.name == package)
+
+    results = results.order_by(Ctag.package_id, File.path)
+    count = results.count()
+    if slice_ is not None:
+        results = results.slice(slice_[0], slice_[1])
+    results = [dict(package=res.package,
+                    version=res.version,
+                    path=res.path,
+                    line=res.line)
+               for res in results.all()]
+    return (count, results)
+
+''' Navigation Queries '''
+
+
+def get_path_links(endpoint, path_to):
+    """
+    returns the path hierarchy with urls, to use with 'You are here:'
+    [(name, url(name)), (...), ...]
+    """
+    path_dict = path_to.split('/')
+    pathl = []
+
+    # we import flask here, in order to permit the use of this module
+    # without requiring the user to have flask (e.g. bin/debsources-update
+    # can run in another machine without flask, because it doesn't use
+    # this method)
+    from flask import url_for
+
+    for (i, p) in enumerate(path_dict):
+        pathl.append((p, url_for(endpoint,
+                                 path_to='/'.join(path_dict[:i+1]))))
+    return pathl
+
+
+def get_stat(sources_path):
+    """
+    Returns the filetype and permissions of the folder/file
+    on the disk, unix-styled.
+    """
+    # When porting to Python3, use stat.filemode directly
+    sources_stat = os.lstat(sources_path)
+    sources_mode, sources_size = sources_stat.st_mode, sources_stat.st_size
+    perm_flags = [
+        (stat.S_IRUSR, "r", "-"),
+        (stat.S_IWUSR, "w", "-"),
+        (stat.S_IXUSR, "x", "-"),
+        (stat.S_IRGRP, "r", "-"),
+        (stat.S_IWGRP, "w", "-"),
+        (stat.S_IXGRP, "x", "-"),
+        (stat.S_IROTH, "r", "-"),
+        (stat.S_IWOTH, "w", "-"),
+        (stat.S_IXOTH, "x", "-"),
+        ]
+    # XXX these flags should be enough.
+    type_flags = [
+        (stat.S_ISLNK, "l"),
+        (stat.S_ISREG, "-"),
+        (stat.S_ISDIR, "d"),
+        ]
+    # add the file type: d/l/-
+    file_type = " "
+    for ft, sign in type_flags:
+        if ft(sources_mode):
+            file_type = sign
+            break
+    file_perms = ""
+    for (flag, do_true, do_false) in perm_flags:
+        file_perms += do_true if (sources_mode & flag) else do_false
+
+    file_size = sources_size
+
+    symlink_dest = None
+    if file_type == "l":
+        symlink_dest = os.readlink(sources_path)
+
+    return vars(LongFMT(file_type, file_perms, file_size, symlink_dest))
+
+
+''' SQLAlchemy queries '''
+
+
+def get_suite_info(session, suite, first=None):
+    '''Return SuiteInfo of a `suite`
+
+    '''
+    return session.query(SuiteInfo).filter(SuiteInfo.name == suite).first()
+
+
+def count_files_checksum(session, checksum, pkg=None):
+    '''Count files with `checksum`
+
+    '''
+    result = (session.query(sql_func.count(Checksum.id))
+              .filter(Checksum.sha256 == checksum)
+              )
+    if pkg is not None and pkg is not "":
+        result = (result.filter(PackageName.name == pkg)
+                  .filter(Checksum.package_id == Package.id)
+                  .filter(Package.name_id == PackageName.id))
+    return result
+
+
+def get_pkg_by_name(session, pkg, suite=None):
+    ''' Returns the package filtered by name `pkg`
+        Filter by `suite`
+
+    '''
+    result = (session.query(PackageName)
+              .filter_by(name=pkg)
+              )
+
+    if suite is not None and suite is not "":
+        result = (result.filter(sql_func.lower(Suite.suite)
+                                == suite)
+                  .filter(Suite.package_id == Package.id)
+                  .filter(Package.name_id == PackageName.id))
+    return result.first()
+
+
+def get_pkg_by_similar_name(session, pkg, suite=None):
+    ''' Get non exact package result based on name `pkg`
+        Filter by `suite`
+
+    '''
+    result = (session.query(PackageName)
+              .filter(sql_func.lower(PackageName.name)
+              .contains(pkg.lower()))
+              .order_by(PackageName.name))
+
+    if suite is not None and suite is not "":
+        return filter_pkg_by_suite(session, result, suite)
+    else:
+        return result
+
+
+def filter_pkg_by_suite(session, result, suite):
+    ''' Filter `result` with suite
+
+    '''
+    return (result.filter(sql_func.lower(Suite.suite) == suite)
+            .filter(Suite.package_id == Package.id)
+            .filter(Package.name_id == PackageName.id)
+            .order_by(PackageName.name)
+            )
+
+
+def get_files_by_checksum(session, checksum, package=None):
+    ''' Returns a list of files whose hexdigest is checksum.
+        Filter with package
+
+    '''
+    results = (session.query(PackageName.name.label("package"),
+                             Package.version.label("version"),
+                             Checksum.file_id.label("file_id"),
+                             File.path.label("path"))
+               .filter(Checksum.sha256 == checksum)
+               .filter(Checksum.package_id == Package.id)
+               .filter(Checksum.file_id == File.id)
+               .filter(Package.name_id == PackageName.id)
+               )
+
+    if package is not None and package != "":
+
+        results = results.filter(PackageName.name == package)
+
+    return results.order_by("package", "version", "path")
+
+
+def get_pkg_filter_prefix(session, prefix, suite=None):
+    '''Get packages filter by `prefix`
+
+    '''
+    result = (session.query(PackageName)
+              .filter(sql_func.lower(PackageName.name)
+                      .startswith(prefix))
+              .order_by(PackageName.name)
+              )
+
+    if suite is not None and suite is not "":
+        return filter_pkg_by_suite(session, result, suite)
+    else:
+        return result
+
+
+def get_all_packages(session):
+    ''' Get the list of packages
+
+    '''
+    return (session.query(PackageName)
+            .order_by(PackageName.name)
+            )
+
+
+def count_packages(session):
+    ''' Count the packages
+
+    '''
+    return (session.query(PackageName).count())
diff --git a/debsources/tests/test_queries.py b/debsources/tests/test_queries.py
new file mode 100644
index 0000000..e2fcd09
--- /dev/null
+++ b/debsources/tests/test_queries.py
@@ -0,0 +1,70 @@
+import unittest
+
+from nose.plugins.attrib import attr
+
+import debsources.query as qry
+from debsources.tests.db_testing import DbTestFixture
+from debsources.tests.testdata import TEST_DB_NAME
+
+
+@attr('Queries')
+class QueriesTest(unittest.TestCase, DbTestFixture):
+
+    @classmethod
+    def setUpClass(cls):
+        cls.db_setup_cls()
+
+        # creates an app object, which is used to run queries
+        from debsources.app import app_wrapper
+
+        # erases a few configuration parameters needed for testing:
+        uri = "postgresql:///" + TEST_DB_NAME
+        app_wrapper.app.config["SQLALCHEMY_DATABASE_URI"] = uri
+        app_wrapper.app.config['LIST_OFFSET'] = 5
+        app_wrapper.app.testing = True
+
+        app_wrapper.go()
+
+        cls.app = app_wrapper.app.test_client()
+        cls.app_wrapper = app_wrapper
+
+    @classmethod
+    def tearDownClass(cls):
+        cls.app_wrapper.engine.dispose()
+        cls.db_teardown_cls()
+
+    def test_packages_prefixes(self):
+        self.assertEqual(qry.get_packages_prefixes(
+            self.app_wrapper.app.config["CACHE_DIR"]),
+            ['b', 'c', 'd', 'f', 'g', 'l', 'libc', 'm',
+             'n', 'o', 'p', 's', 'u'])
+
+    def test_list_versions(self):
+        # Test without suit
+        packages = qry.list_versions(self.session, "gnubg")
+        self.assertEqual([p.version for p in packages],
+                         ["0.90+20091206-4", "0.90+20120429-1", "1.02.000-2"])
+
+        # Test with suit
+        packages = qry.list_versions(self.session, "gnubg", "wheezy")
+        self.assertEqual([p.version for p in packages], ["0.90+20120429-1"])
+
+        # Test returning suites without suit as parameter
+        self.assertTrue({'suites': [u'wheezy'], 'version': u'0.90+20120429-1',
+                         'area': u'main'} in
+                        qry.list_versions_w_suites(self.session, "gnubg"))
+
+        # Test returning suites with a suit as parameter
+        self.assertEqual(qry.list_versions_w_suites(self.session,
+                                                    "gnubg", "jessie"),
+                         [{'suites': [u'jessie', u'sid'],
+                          'version': u'1.02.000-2', 'area': u'main'}])
+
+    def test_find_ctag(self):
+        self.assertEqual(qry.find_ctag(self.session, "swap")[0], 8)
+
+        ctags = qry.find_ctag(self.session, "swap", "gnubg")
+        self.assertEqual(ctags[0], 5)
+        self.assertTrue({'path': 'eval.c', 'line': 1747,
+                        'version': u'0.90+20091206-4', 'package': u'gnubg'}
+                        in ctags[1])
-- 
2.1.4

Attachment: signature.asc
Description: OpenPGP digital signature


Reply to: