Hello, I didn't know someone else was working on this. I am sorry if I am double-working. Here is my patch. Cheers, Orestis
From 659479c85c91a32e6c645549766b98e4bdcc9717 Mon Sep 17 00:00:00 2001 From: Orestis Ioannou <orestis@oioannou.com> Date: Thu, 5 Mar 2015 09:03:09 +0100 Subject: [PATCH] Models: Refactor models to contain only ORM abstraction Closes: #762934 Moved the static queries of each model and non ORM objects to query.py Created a test suite to verify the Queries. --- debsources/app/views.py | 18 +- debsources/models.py | 379 +------------------------------------- debsources/query.py | 387 +++++++++++++++++++++++++++++++++++++++ debsources/tests/test_queries.py | 69 +++++++ 4 files changed, 467 insertions(+), 386 deletions(-) create mode 100644 debsources/query.py create mode 100644 debsources/tests/test_queries.py diff --git a/debsources/app/views.py b/debsources/app/views.py index 129d4ad..e32c536 100644 --- a/debsources/app/views.py +++ b/debsources/app/views.py @@ -36,8 +36,10 @@ from debsources.excepts import ( InvalidPackageOrVersionError, FileOrFolderNotFound, Http500Error, Http404Error, Http404ErrorSuggestions, Http403Error) from debsources.models import ( - Ctag, Package, PackageName, Checksum, Location, Directory, - SourceFile, File, Suite) + Ctag, Package, PackageName, Checksum, File, Suite) +from debsources.query import (Location, Directory, + SourceFile, Queries) + from debsources.app.sourcecode import SourceCodeIterator from debsources.app.forms import SearchForm from debsources.app.infobox import Infobox @@ -62,7 +64,7 @@ def skeleton_variables(): update_ts_file = os.path.join(app.config['CACHE_DIR'], 'last-update') last_update = local_info.read_update_ts(update_ts_file) - packages_prefixes = PackageName.get_packages_prefixes( + packages_prefixes = Queries.get_packages_prefixes( app.config["CACHE_DIR"]) credits_file = os.path.join(app.config["LOCAL_DIR"], "credits.html") @@ -144,7 +146,7 @@ def deal_404_error(error, mode='html'): if isinstance(error, Http404ErrorSuggestions): # let's suggest all the possible locations with a different # package version - possible_versions = PackageName.list_versions( + possible_versions = Queries.list_versions( session, error.package) suggestions = ['/'.join(filter(None, [error.package, v.version, error.path])) @@ -441,7 +443,7 @@ class PrefixView(GeneralView): suite = suite.lower() if suite == "all": suite = "" - if prefix in PackageName.get_packages_prefixes( + if prefix in Queries.get_packages_prefixes( app.config["CACHE_DIR"]): try: if not suite: @@ -501,7 +503,7 @@ class SourceView(GeneralView): suite = "" # we list the version with suites it belongs to try: - versions_w_suites = PackageName.list_versions_w_suites( + versions_w_suites = Queries.list_versions_w_suites( session, packagename, suite) except InvalidPackageOrVersionError: raise Http404Error("%s not found" % packagename) @@ -605,7 +607,7 @@ class SourceView(GeneralView): when 'latest' is provided instead of a version number """ try: - versions = PackageName.list_versions(session, package) + versions = Queries.list_versions(session, package) except InvalidPackageOrVersionError: raise Http404Error("%s not found" % package) # the latest version is the latest item in the @@ -876,7 +878,7 @@ class CtagView(GeneralView): pagination = None slice_ = None - (count, results) = Ctag.find_ctag(session, ctag, slice_=slice_, + (count, results) = Queries.find_ctag(session, ctag, slice_=slice_, package=package) if not self.all_: pagination = Pagination(page, offset, count) diff --git a/debsources/models.py b/debsources/models.py index e4d05fc..5f57246 100644 --- a/debsources/models.py +++ b/debsources/models.py @@ -16,30 +16,17 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. -import os -import magic -import stat -from collections import namedtuple - from sqlalchemy import Column, ForeignKey from sqlalchemy import UniqueConstraint, PrimaryKeyConstraint from sqlalchemy import Index from sqlalchemy import Boolean, Date, DateTime, Integer, LargeBinary, String from sqlalchemy import Enum -from sqlalchemy import and_ -from sqlalchemy import func as sql_func from sqlalchemy.orm import relationship from sqlalchemy.ext.declarative import declarative_base -from debian.debian_support import version_compare -from debsources.excepts import InvalidPackageOrVersionError, \ - FileOrFolderNotFound from debsources.consts import VCS_TYPES, SLOCCOUNT_LANGUAGES, \ - CTAGS_LANGUAGES, METRIC_TYPES, AREAS, PREFIXES_DEFAULT -from debsources import filetype -from debsources.debmirror import SourcePackage -from debsources.consts import SUITES + CTAGS_LANGUAGES, METRIC_TYPES Base = declarative_base() @@ -64,77 +51,6 @@ class PackageName(Base): def __repr__(self): return self.name - @staticmethod - def get_packages_prefixes(cache_dir): - """ - returns the packages prefixes (a, b, ..., liba, libb, ..., y, z) - cache_dir: the cache directory, usually comes from the app config - """ - try: - with open(os.path.join(cache_dir, 'pkg-prefixes')) as f: - prefixes = [l.rstrip() for l in f] - except IOError: - prefixes = PREFIXES_DEFAULT - return prefixes - - @staticmethod - def list_versions(session, packagename, suite=""): - """ - return all versions of a packagename. if suite is specified, only - versions contained in that suite are returned. - """ - try: - name_id = session.query(PackageName) \ - .filter(PackageName.name == packagename) \ - .first().id - except Exception: - raise InvalidPackageOrVersionError(packagename) - try: - if not suite: - versions = session.query(Package) \ - .filter(Package.name_id == name_id).all() - else: - versions = (session.query(Package) - .filter(Package.name_id == name_id) - .filter(sql_func.lower(Suite.suite) - == suite) - .filter(Suite.package_id == Package.id) - .all()) - except Exception: - raise InvalidPackageOrVersionError(packagename) - # we sort the versions according to debian versions rules - versions = sorted(versions, cmp=version_compare) - return versions - - @staticmethod - def list_versions_w_suites(session, packagename, suite=""): - """ - return versions with suites. if suite is provided, then only return - versions contained in that suite. - """ - # FIXME a left outer join on (Package, Suite) is more preferred. - # However, per https://stackoverflow.com/a/997467, custom aggregation - # function to concatenate the suite names for the group_by should be - # defined on database connection level. - versions = PackageName.list_versions(session, packagename, suite) - versions_w_suites = [] - try: - for v in versions: - suites = session.query(Suite) \ - .filter(Suite.package_id == v.id) \ - .all() - # sort the suites according to debsources.consts.SUITES - # use keyfunc to make it py3 compatible - suites.sort(key=lambda s: SUITES['all'].index(s.suite)) - suites = [s.suite for s in suites] - v = v.to_dict() - v['suites'] = suites - versions_w_suites.append(v) - except Exception: - raise InvalidPackageOrVersionError(packagename) - - return versions_w_suites - def to_dict(self): """ simply serializes a package (because SQLAlchemy query results @@ -351,41 +267,6 @@ class Ctag(Base): # .filter(Ctag.tag in ctags) # .filter(Ctag - @staticmethod - def find_ctag(session, ctag, package=None, slice_=None): - """ - Returns places in the code where a ctag is found. - tuple (count, [sliced] results) - - session: an SQLAlchemy session - ctag: the ctag to search - package: limit results to package - """ - - results = (session.query(PackageName.name.label("package"), - Package.version.label("version"), - Ctag.file_id.label("file_id"), - File.path.label("path"), - Ctag.line.label("line")) - .filter(Ctag.tag == ctag) - .filter(Ctag.package_id == Package.id) - .filter(Ctag.file_id == File.id) - .filter(Package.name_id == PackageName.id) - ) - if package is not None: - results = results.filter(PackageName.name == package) - - results = results.order_by(Ctag.package_id, File.path) - count = results.count() - if slice_ is not None: - results = results.slice(slice_[0], slice_[1]) - results = [dict(package=res.package, - version=res.version, - path=res.path, - line=res.line) - for res in results.all()] - return (count, results) - class Metric(Base): __tablename__ = 'metrics' @@ -477,261 +358,3 @@ class HistorySlocCount(Base): def __init__(self, suite, timestamp): self.suite = suite self.timestamp = timestamp - -# it's used in Location.get_stat -# to bypass flake8 complaints, we do not inject the global namespace -# with globals()["LongFMT"] = namedtuple... -LongFMT = namedtuple("LongFMT", ["type", "perms", "size", "symlink_dest"]) - - -class Location(object): - """ a location in a package, can be a directory or a file """ - - def _get_debian_path(self, session, package, version, sources_dir): - """ - Returns the Debian path of a package version. - For example: main/h - contrib/libz - It's the path of a *version*, since a package can have multiple - versions in multiple areas (ie main/contrib/nonfree). - - sources_dir: the sources directory, usually comes from the app config - """ - prefix = SourcePackage.pkg_prefix(package) - - try: - p_id = session.query(PackageName) \ - .filter(PackageName.name == package).first().id - varea = session.query(Package) \ - .filter(and_(Package.name_id == p_id, - Package.version == version)) \ - .first().area - except: - # the package or version doesn't exist in the database - # BUT: packages are stored for a longer time in the filesystem - # to allow codesearch.d.n and others less up-to-date platforms - # to point here. - # Problem: we don't know the area of such a package - # so we try in main, contrib and non-free. - for area in AREAS: - if os.path.exists(os.path.join(sources_dir, area, - prefix, package, version)): - return os.path.join(area, prefix) - - raise InvalidPackageOrVersionError("%s %s" % (package, version)) - - return os.path.join(varea, prefix) - - def __init__(self, session, sources_dir, sources_static, - package, version="", path=""): - """ initialises useful attributes """ - debian_path = self._get_debian_path(session, - package, version, sources_dir) - self.package = package - self.version = version - self.path = path - self.path_to = os.path.join(package, version, path) - - self.sources_path = os.path.join( - sources_dir, - debian_path, - self.path_to) - - self.version_path = os.path.join( - sources_dir, - debian_path, - package, - version) - - if not(os.path.exists(self.sources_path)): - raise FileOrFolderNotFound("%s" % (self.path_to)) - - self.sources_path_static = os.path.join( - sources_static, - debian_path, - self.path_to) - - def is_dir(self): - """ True if self is a directory, False if it's not """ - return os.path.isdir(self.sources_path) - - def is_file(self): - """ True if sels is a file, False if it's not """ - return os.path.isfile(self.sources_path) - - def is_symlink(self): - """ True if a folder/file is a symbolic link file, False if it's not - """ - return os.path.islink(self.sources_path) - - def get_package(self): - return self.package - - def get_version(self): - return self.version - - def get_path(self): - return self.path - - def get_deepest_element(self): - if self.version == "": - return self.package - elif self.path == "": - return self.version - else: - return self.path.split("/")[-1] - - def get_path_to(self): - return self.path_to.rstrip("/") - - @staticmethod - def get_stat(sources_path): - """ - Returns the filetype and permissions of the folder/file - on the disk, unix-styled. - """ - # When porting to Python3, use stat.filemode directly - sources_stat = os.lstat(sources_path) - sources_mode, sources_size = sources_stat.st_mode, sources_stat.st_size - perm_flags = [ - (stat.S_IRUSR, "r", "-"), - (stat.S_IWUSR, "w", "-"), - (stat.S_IXUSR, "x", "-"), - (stat.S_IRGRP, "r", "-"), - (stat.S_IWGRP, "w", "-"), - (stat.S_IXGRP, "x", "-"), - (stat.S_IROTH, "r", "-"), - (stat.S_IWOTH, "w", "-"), - (stat.S_IXOTH, "x", "-"), - ] - # XXX these flags should be enough. - type_flags = [ - (stat.S_ISLNK, "l"), - (stat.S_ISREG, "-"), - (stat.S_ISDIR, "d"), - ] - # add the file type: d/l/- - file_type = " " - for ft, sign in type_flags: - if ft(sources_mode): - file_type = sign - break - file_perms = "" - for (flag, do_true, do_false) in perm_flags: - file_perms += do_true if (sources_mode & flag) else do_false - - file_size = sources_size - - symlink_dest = None - if file_type == "l": - symlink_dest = os.readlink(sources_path) - - return vars(LongFMT(file_type, file_perms, file_size, symlink_dest)) - - @staticmethod - def get_path_links(endpoint, path_to): - """ - returns the path hierarchy with urls, to use with 'You are here:' - [(name, url(name)), (...), ...] - """ - path_dict = path_to.split('/') - pathl = [] - - # we import flask here, in order to permit the use of this module - # without requiring the user to have flask (e.g. bin/debsources-update - # can run in another machine without flask, because it doesn't use - # this method) - from flask import url_for - - for (i, p) in enumerate(path_dict): - pathl.append((p, url_for(endpoint, - path_to='/'.join(path_dict[:i+1])))) - return pathl - - -class Directory(object): - """ a folder in a package """ - - def __init__(self, location, toplevel=False): - # if the directory is a toplevel one, we remove the .pc folder - self.sources_path = location.sources_path - self.toplevel = toplevel - self.location = location - - def get_listing(self): - """ - returns the list of folders/files in a directory, - along with their type (directory/file) - in a tuple (name, type) - """ - def get_type(f): - if os.path.isdir(os.path.join(self.sources_path, f)): - return "directory" - else: - return "file" - get_stat, join_path = self.location.get_stat, os.path.join - listing = sorted(dict(name=f, type=get_type(f), - stat=get_stat(join_path(self.sources_path, f))) - for f in os.listdir(self.sources_path)) - if self.toplevel: - listing = filter(lambda x: x['name'] != ".pc", listing) - - return listing - - -class SourceFile(object): - """ a source file in a package """ - - def __init__(self, location): - self.location = location - self.sources_path = location.sources_path - self.sources_path_static = location.sources_path_static - self.mime = self._find_mime() - - def _find_mime(self): - """ returns the mime encoding and type of a file """ - mime = magic.open(magic.MIME_TYPE) - mime.load() - type_ = mime.file(self.sources_path) - mime.close() - mime = magic.open(magic.MIME_ENCODING) - mime.load() - encoding = mime.file(self.sources_path) - mime.close() - return dict(encoding=encoding, type=type_) - - def get_mime(self): - return self.mime - - def get_sha256sum(self, session): - """ - Queries the DB and returns the shasum of the file. - """ - shasum = session.query(Checksum.sha256) \ - .filter(Checksum.package_id == Package.id) \ - .filter(Package.name_id == PackageName.id) \ - .filter(File.id == Checksum.file_id) \ - .filter(PackageName.name == self.location.package) \ - .filter(Package.version == self.location.version) \ - .filter(File.path == str(self.location.path)) \ - .first() - # WARNING: in the DB path is binary, and here - # location.path is unicode, because the path comes from - # the URL. TODO: check with non-unicode paths - if shasum: - shasum = shasum[0] - return shasum - - def istextfile(self): - """True if self is a text file, False if it's not. - - """ - return filetype.is_text_file(self.mime['type']) - # for substring in text_file_mimes: - # if substring in self.mime['type']: - # return True - # return False - - def get_raw_url(self): - """ return the raw url on disk (e.g. data/main/a/azerty/foo.bar) """ - return self.sources_path_static diff --git a/debsources/query.py b/debsources/query.py new file mode 100644 index 0000000..b8ee557 --- /dev/null +++ b/debsources/query.py @@ -0,0 +1,387 @@ +import os +import stat +from collections import namedtuple + +from debian.debian_support import version_compare + +from debsources import filetype + +from debsources.consts import AREAS, PREFIXES_DEFAULT +from debsources.consts import SUITES + +from debsources.debmirror import SourcePackage + +from debsources.excepts import FileOrFolderNotFound, \ + InvalidPackageOrVersionError + +from debsources.models import ( + Checksum, Ctag, File, Package, PackageName, Suite) + +import magic + +from sqlalchemy import and_ +from sqlalchemy import func as sql_func + + +LongFMT = namedtuple("LongFMT", ["type", "perms", "size", "symlink_dest"]) + + +class Location(object): + """ a location in a package, can be a directory or a file """ + + def _get_debian_path(self, session, package, version, sources_dir): + """ + Returns the Debian path of a package version. + For example: main/h + contrib/libz + It's the path of a *version*, since a package can have multiple + versions in multiple areas (ie main/contrib/nonfree). + + sources_dir: the sources directory, usually comes from the app config + """ + prefix = SourcePackage.pkg_prefix(package) + + try: + p_id = session.query(PackageName) \ + .filter(PackageName.name == package).first().id + varea = session.query(Package) \ + .filter(and_(Package.name_id == p_id, + Package.version == version)) \ + .first().area + except: + # the package or version doesn't exist in the database + # BUT: packages are stored for a longer time in the filesystem + # to allow codesearch.d.n and others less up-to-date platforms + # to point here. + # Problem: we don't know the area of such a package + # so we try in main, contrib and non-free. + for area in AREAS: + if os.path.exists(os.path.join(sources_dir, area, + prefix, package, version)): + return os.path.join(area, prefix) + + raise InvalidPackageOrVersionError("%s %s" % (package, version)) + + return os.path.join(varea, prefix) + + def __init__(self, session, sources_dir, sources_static, + package, version="", path=""): + """ initialises useful attributes """ + debian_path = self._get_debian_path(session, + package, version, sources_dir) + self.package = package + self.version = version + self.path = path + self.path_to = os.path.join(package, version, path) + + self.sources_path = os.path.join( + sources_dir, + debian_path, + self.path_to) + + self.version_path = os.path.join( + sources_dir, + debian_path, + package, + version) + + if not(os.path.exists(self.sources_path)): + raise FileOrFolderNotFound("%s" % (self.path_to)) + + self.sources_path_static = os.path.join( + sources_static, + debian_path, + self.path_to) + + def is_dir(self): + """ True if self is a directory, False if it's not """ + return os.path.isdir(self.sources_path) + + def is_file(self): + """ True if sels is a file, False if it's not """ + return os.path.isfile(self.sources_path) + + def is_symlink(self): + """ True if a folder/file is a symbolic link file, False if it's not + """ + return os.path.islink(self.sources_path) + + def get_package(self): + return self.package + + def get_version(self): + return self.version + + def get_path(self): + return self.path + + def get_deepest_element(self): + if self.version == "": + return self.package + elif self.path == "": + return self.version + else: + return self.path.split("/")[-1] + + def get_path_to(self): + return self.path_to.rstrip("/") + + @staticmethod + def get_stat(sources_path): + """ + Returns the filetype and permissions of the folder/file + on the disk, unix-styled. + """ + # When porting to Python3, use stat.filemode directly + sources_stat = os.lstat(sources_path) + sources_mode, sources_size = sources_stat.st_mode, sources_stat.st_size + perm_flags = [ + (stat.S_IRUSR, "r", "-"), + (stat.S_IWUSR, "w", "-"), + (stat.S_IXUSR, "x", "-"), + (stat.S_IRGRP, "r", "-"), + (stat.S_IWGRP, "w", "-"), + (stat.S_IXGRP, "x", "-"), + (stat.S_IROTH, "r", "-"), + (stat.S_IWOTH, "w", "-"), + (stat.S_IXOTH, "x", "-"), + ] + # XXX these flags should be enough. + type_flags = [ + (stat.S_ISLNK, "l"), + (stat.S_ISREG, "-"), + (stat.S_ISDIR, "d"), + ] + # add the file type: d/l/- + file_type = " " + for ft, sign in type_flags: + if ft(sources_mode): + file_type = sign + break + file_perms = "" + for (flag, do_true, do_false) in perm_flags: + file_perms += do_true if (sources_mode & flag) else do_false + + file_size = sources_size + + symlink_dest = None + if file_type == "l": + symlink_dest = os.readlink(sources_path) + + return vars(LongFMT(file_type, file_perms, file_size, symlink_dest)) + + @staticmethod + def get_path_links(endpoint, path_to): + """ + returns the path hierarchy with urls, to use with 'You are here:' + [(name, url(name)), (...), ...] + """ + path_dict = path_to.split('/') + pathl = [] + + # we import flask here, in order to permit the use of this module + # without requiring the user to have flask (e.g. bin/debsources-update + # can run in another machine without flask, because it doesn't use + # this method) + from flask import url_for + + for (i, p) in enumerate(path_dict): + pathl.append((p, url_for(endpoint, + path_to='/'.join(path_dict[:i+1])))) + return pathl + + +class Directory(object): + """ a folder in a package """ + + def __init__(self, location, toplevel=False): + # if the directory is a toplevel one, we remove the .pc folder + self.sources_path = location.sources_path + self.toplevel = toplevel + self.location = location + + def get_listing(self): + """ + returns the list of folders/files in a directory, + along with their type (directory/file) + in a tuple (name, type) + """ + def get_type(f): + if os.path.isdir(os.path.join(self.sources_path, f)): + return "directory" + else: + return "file" + get_stat, join_path = self.location.get_stat, os.path.join + listing = sorted(dict(name=f, type=get_type(f), + stat=get_stat(join_path(self.sources_path, f))) + for f in os.listdir(self.sources_path)) + if self.toplevel: + listing = filter(lambda x: x['name'] != ".pc", listing) + + return listing + + +class SourceFile(object): + """ a source file in a package """ + + def __init__(self, location): + self.location = location + self.sources_path = location.sources_path + self.sources_path_static = location.sources_path_static + self.mime = self._find_mime() + + def _find_mime(self): + """ returns the mime encoding and type of a file """ + mime = magic.open(magic.MIME_TYPE) + mime.load() + type_ = mime.file(self.sources_path) + mime.close() + mime = magic.open(magic.MIME_ENCODING) + mime.load() + encoding = mime.file(self.sources_path) + mime.close() + return dict(encoding=encoding, type=type_) + + def get_mime(self): + return self.mime + + def get_sha256sum(self, session): + """ + Queries the DB and returns the shasum of the file. + """ + shasum = session.query(Checksum.sha256) \ + .filter(Checksum.package_id == Package.id) \ + .filter(Package.name_id == PackageName.id) \ + .filter(File.id == Checksum.file_id) \ + .filter(PackageName.name == self.location.package) \ + .filter(Package.version == self.location.version) \ + .filter(File.path == str(self.location.path)) \ + .first() + # WARNING: in the DB path is binary, and here + # location.path is unicode, because the path comes from + # the URL. TODO: check with non-unicode paths + if shasum: + shasum = shasum[0] + return shasum + + def istextfile(self): + """True if self is a text file, False if it's not. + + """ + return filetype.is_text_file(self.mime['type']) + # for substring in text_file_mimes: + # if substring in self.mime['type']: + # return True + # return False + + def get_raw_url(self): + """ return the raw url on disk (e.g. data/main/a/azerty/foo.bar) """ + return self.sources_path_static + + +class Queries(object): + + @staticmethod + def get_packages_prefixes(cache_dir): + """ + returns the packages prefixes (a, b, ..., liba, libb, ..., y, z) + cache_dir: the cache directory, usually comes from the app config + """ + try: + with open(os.path.join(cache_dir, 'pkg-prefixes')) as f: + prefixes = [l.rstrip() for l in f] + except IOError: + prefixes = PREFIXES_DEFAULT + return prefixes + + @staticmethod + def list_versions(session, packagename, suite=""): + """ + return all versions of a packagename. if suite is specified, only + versions contained in that suite are returned. + """ + try: + name_id = session.query(PackageName) \ + .filter(PackageName.name == packagename) \ + .first().id + except Exception: + raise InvalidPackageOrVersionError(packagename) + try: + if not suite: + versions = session.query(Package) \ + .filter(Package.name_id == name_id).all() + else: + versions = (session.query(Package) + .filter(Package.name_id == name_id) + .filter(sql_func.lower(Suite.suite) + == suite) + .filter(Suite.package_id == Package.id) + .all()) + except Exception: + raise InvalidPackageOrVersionError(packagename) + # we sort the versions according to debian versions rules + versions = sorted(versions, cmp=version_compare) + return versions + + @staticmethod + def list_versions_w_suites(session, packagename, suite=""): + """ + return versions with suites. if suite is provided, then only return + versions contained in that suite. + """ + # FIXME a left outer join on (Package, Suite) is more preferred. + # However, per https://stackoverflow.com/a/997467, custom aggregation + # function to concatenate the suite names for the group_by should be + # defined on database connection level. + versions = Queries.list_versions(session, packagename, suite) + versions_w_suites = [] + try: + for v in versions: + suites = session.query(Suite) \ + .filter(Suite.package_id == v.id) \ + .all() + # sort the suites according to debsources.consts.SUITES + # use keyfunc to make it py3 compatible + suites.sort(key=lambda s: SUITES['all'].index(s.suite)) + suites = [s.suite for s in suites] + v = v.to_dict() + v['suites'] = suites + versions_w_suites.append(v) + except Exception: + raise InvalidPackageOrVersionError(packagename) + return versions_w_suites + + @staticmethod + def find_ctag(session, ctag, package=None, slice_=None): + """ + Returns places in the code where a ctag is found. + tuple (count, [sliced] results) + + session: an SQLAlchemy session + ctag: the ctag to search + package: limit results to package + """ + + results = (session.query(PackageName.name.label("package"), + Package.version.label("version"), + Ctag.file_id.label("file_id"), + File.path.label("path"), + Ctag.line.label("line")) + .filter(Ctag.tag == ctag) + .filter(Ctag.package_id == Package.id) + .filter(Ctag.file_id == File.id) + .filter(Package.name_id == PackageName.id) + ) + if package is not None: + results = results.filter(PackageName.name == package) + + results = results.order_by(Ctag.package_id, File.path) + count = results.count() + if slice_ is not None: + results = results.slice(slice_[0], slice_[1]) + results = [dict(package=res.package, + version=res.version, + path=res.path, + line=res.line) + for res in results.all()] + return (count, results) diff --git a/debsources/tests/test_queries.py b/debsources/tests/test_queries.py new file mode 100644 index 0000000..0fb7094 --- /dev/null +++ b/debsources/tests/test_queries.py @@ -0,0 +1,69 @@ +import unittest + +from nose.plugins.attrib import attr + +from debsources.query import Queries + +from debsources.tests.db_testing import DbTestFixture +from debsources.tests.testdata import TEST_DB_NAME + + +@attr('Queries') +class QueriesTest(unittest.TestCase, DbTestFixture): + + @classmethod + def setUpClass(cls): + cls.db_setup_cls() + + # creates an app object, which is used to run queries + from debsources.app import app_wrapper + + # erases a few configuration parameters needed for testing: + uri = "postgresql:///" + TEST_DB_NAME + app_wrapper.app.config["SQLALCHEMY_DATABASE_URI"] = uri + app_wrapper.app.config['LIST_OFFSET'] = 5 + app_wrapper.app.testing = True + + app_wrapper.go() + + cls.app = app_wrapper.app.test_client() + cls.app_wrapper = app_wrapper + + @classmethod + def tearDownClass(cls): + cls.app_wrapper.engine.dispose() + cls.db_teardown_cls() + + def test_packages_prefixes(self): + self.assertEqual(Queries.get_packages_prefixes( + self.app_wrapper.app.config["CACHE_DIR"]), + ['b', 'd', 'f', 'g', 'l', 'libc', 'm', 'n', 'o', 's', 'u']) + + def test_list_versions(self): + # Test without suit + packages = Queries.list_versions(self.session, "gnubg") + self.assertEqual([p.version for p in packages], + ["0.90+20091206-4", "0.90+20120429-1", "1.02.000-2"]) + + # Test with suit + packages = Queries.list_versions(self.session, "gnubg", "wheezy") + self.assertEqual([p.version for p in packages], ["0.90+20120429-1"]) + + # Test returning suites without suit as parameter + self.assertTrue({'suites': [u'wheezy'], 'version': u'0.90+20120429-1', + 'area': u'main'} in + Queries.list_versions_w_suites(self.session, "gnubg")) + + # Test returning suites with a suit as parameter + self.assertEqual(Queries.list_versions_w_suites(self.session, "gnubg", "jessie"), + [{'suites': [u'jessie', u'sid'], + 'version': u'1.02.000-2', 'area': u'main'}]) + + def test_find_ctag(self): + self.assertEqual(Queries.find_ctag(self.session, "swap")[0], 8) + + ctags = Queries.find_ctag(self.session, "swap", "gnubg") + self.assertEqual(ctags[0], 5) + self.assertTrue({'path': 'eval.c', 'line': 1747, + 'version': u'0.90+20091206-4', 'package': u'gnubg'} + in ctags[1]) -- 2.1.4
Attachment:
signature.asc
Description: OpenPGP digital signature