API change. Use of a single write operation for the cache file.

master
Franco Masotti 9 months ago
parent f745aa5e08
commit 0a837544b8
Signed by: frnmst
GPG Key ID: 24116ED85666780A
  1. 4
      licheck/__init__.py
  2. 133
      licheck/api.py
  3. 8
      licheck/exceptions.py
  4. 70
      licheck/tests/tests.py

@ -29,7 +29,7 @@ from .api import (build_command, check_cache_structure,
read_remote_files, read_yaml_file,
transform_cache_to_data_object, write_cache)
from .cli import CliInterface
from .exceptions import (BinaryDoesNotExist,
from .exceptions import (BinaryDoesNotExist, IncoherentData,
IncoherentProgrammingLanguageValue, InvalidCache,
InvalidCommonDataStructure, InvalidConfiguration,
InvalidOutput)
InvalidOutput, NotAChecksum)

@ -37,10 +37,10 @@ from appdirs import AppDirs
from tabulate import tabulate
from .constants import common_defaults, programs
from .exceptions import (BinaryDoesNotExist,
from .exceptions import (BinaryDoesNotExist, IncoherentData,
IncoherentProgrammingLanguageValue, InvalidCache,
InvalidCommonDataStructure, InvalidConfiguration,
InvalidOutput)
InvalidOutput, NotAChecksum)
def build_command(binary: str, license_program: str, file: str) -> str:
@ -147,6 +147,8 @@ def transform_cache_to_data_object(cache: dict, file: str, file_checksum: str) -
"""
if not check_cache_structure(cache):
raise InvalidCache
if not is_sha512_checksum(file_checksum):
raise NotAChecksum
i = 0
output = list()
@ -180,7 +182,7 @@ def read_yaml_file(file: str) -> dict:
:parameter file: the file name.
:type file: str
:returns: an objects.
:returns: an object.
:rtype: dict
:raises: a PyYAML or a built-in exception.
"""
@ -191,21 +193,60 @@ def read_yaml_file(file: str) -> dict:
return data
def write_cache(packages: list, file_checksum: str, cache_file: str):
r"""Write an object as a YAML file.
def check_dependencies_files_data_structure(dependencies_files: dict):
r"""Check that the data structure is a dict of filenames and checksums.
:parameter dependencies_files: the data structure.
:type dependencies_files: dict
:raises: TypeError, NotAChecksum or a built-in exception.
"""
for f in dependencies_files:
if not isinstance(f, str):
raise TypeError
if not is_sha512_checksum(dependencies_files[f]):
raise NotAChecksum
def is_sha512_checksum(string: str) -> bool:
r"""Check that a string is a valid hex representation of an SHA512 checksum.
:parameter string: a string.
:type string: str
:returns: ``True`` if the string is a valid hexadecimal representation of
a SHA512 checksum, ``False`` otherwise.
:rtype: bool
:raises: a built-in exception.
"""
# Use hashlib representation.
# See also:
# https://datatracker.ietf.org/doc/html/rfc4634#section-4.2
# https://csrc.nist.gov/csrc/media/publications/fips/180/3/archive/2008-10-31/documents/fips180-3_final.pdf
# len(m.digest()) = 64 bytes = 512 bits = 512/4 hex = 128 hex
regex = '([0-9]|[a-f]){128}'
is_checksum = True
if re.match(regex, string) is None:
is_checksum = False
return is_checksum
def create_cache_output(packages: list, file_checksum: str, table: dict):
r"""Populate an object with relevant data.
:parameter packages: an object with a common structure in this program.
:parameter file_checksum: the SHA-512 checksum of the file content.
:parameter cache_file: the file where to write the output
:parameter table: the object to be populated.
:type packages: list
:type file_checksum: str
:type cache_file: str
:raises: a PyYAML or a built-in exception.
:type table: dict
:raises: InvalidCommonDataStructure or a built-in exception.
"""
if not check_data_object_structure(packages):
raise InvalidCommonDataStructure
if not is_sha512_checksum(file_checksum):
raise NotAChecksum
table = dict()
table[file_checksum] = list()
for package in packages:
table[file_checksum].append({
@ -215,11 +256,55 @@ def write_cache(packages: list, file_checksum: str, cache_file: str):
'v': package['version'],
})
# Write one file at the time using the append operation.
with open(cache_file, 'a') as f:
def write_cache(table: dict, cache_file: str):
r"""Write an object as a YAML file.
:parameter table: an object with the cache.
:parameter cache_file: the file where to write the output.
:type table: dict
:type cache_file: str
:raises: a PyYAML or a built-in exception.
"""
with open(cache_file, 'w') as f:
f.write(yaml.dump(table))
def save_cache(data: list, existing_cache: dict, files_struct: dict, cache_file: str):
r"""Save exising and new cache.
:parameter data: an object containing the data formatted for this program.
:parameter existing_cache: pre-existing data before running this program.
:parameter files_struct: an object with file names and their checksums.
:parameter cache_file: the file where to write the output.
:type data: list
:type existing_cache: dict
:type files_struct: dict
:type cache_file: str
:raises: a built-in exception.
"""
if not check_cache_structure(existing_cache):
raise InvalidCache
check_dependencies_files_data_structure(files_struct)
if len(data) != len(existing_cache) + len(files_struct):
raise IncoherentData
# Unite all checksums in the same struct.
union = list()
for c in existing_cache:
union.append(c)
for file in files_struct:
union.append(files_struct[file])
# Remove duplicates.
union = list(set(union))
table = dict()
for i in range(0, len(union)):
create_cache_output(data[i], union[i], table)
write_cache(table, cache_file)
def check_licenses(packages: list, licenses_allowed: list, include_empty_as_errors: bool = True) -> list:
r"""Filter packages to include only the ones with errors.
@ -571,7 +656,7 @@ def read_remote_files(include_files: list, cache_dir: str) -> list:
def create_dependencies_files_data_structure(dependencies_files: list) -> dict:
r"""Create an object that couples file names and checksums.
r"""Create an object that couples file names and their checksums.
:parameter dependencies_files: a list of files containing the dependencies to be checked.
:type packages: list
@ -627,9 +712,12 @@ def pipeline(configuration_file: str = '.allowed_licenses.yml',
allowed_licenses += read_remote_files(include, cache_dir)
allowed_licenses = set(allowed_licenses)
# Get filenames and checksum for the current repository.
files_struct = create_dependencies_files_data_structure(dependencies_files)
full_list = list()
out = list()
# Filter cache not present in current files. This is necessary
# when you run licheck on different files (e.g: on different
# repositories).
@ -640,7 +728,6 @@ def pipeline(configuration_file: str = '.allowed_licenses.yml',
for c in cache_subset:
out.append(transform_cache_to_data_object(cache_subset, 'dummy', c))
i = 0
# Go through the files with the package dependencies.
for file in files_struct:
# Load data from cache or call an external program.
@ -656,24 +743,8 @@ def pipeline(configuration_file: str = '.allowed_licenses.yml',
full_list += output
out.append(output)
i += 1
# Remove the cache file since we need to write it again
# with updated data.
if pathlib.Path(cache_file).is_file():
pathlib.Path(cache_file).unlink()
# Preserve unmodified existing cache.
i = 0
for c in cache_subset:
write_cache(out[i], c, cache_file)
i += 1
# Add or update cache.
i = 0
for file in files_struct:
write_cache(out[i], files_struct[file], cache_file)
i += 1
# out = cache + new files.
save_cache(out, cache_subset, files_struct, cache_file)
errors = check_licenses(full_list, allowed_licenses)
if len(errors) > 0:

@ -43,3 +43,11 @@ class InvalidCommonDataStructure(Exception):
class IncoherentProgrammingLanguageValue(Exception):
r"""The programming language value is not uniform."""
class NotAChecksum(Exception):
r"""String is not a valid SHA512 checksum."""
class IncoherentData(Exception):
r"""Input data is not what expected."""

@ -147,7 +147,7 @@ Found dependencies: 3
r"""Test transform_cache_to_data_object."""
# Test a valid cache with file_checksum == id
cache = {
'id0': [
'a0' * 64: [
{
'p': 'pkg0',
's': ['short'],
@ -155,7 +155,7 @@ Found dependencies: 3
'v': 'ver0',
},
],
'id1': [
'a1' * 64: [
{
'p': 'pkg1',
's': ['short'],
@ -168,7 +168,7 @@ Found dependencies: 3
m = unittest.mock.Mock(return_value=True)
with patch('licheck.api.check_cache_structure', m):
self.assertEqual(
api.transform_cache_to_data_object(cache, 'file', 'id0'),
api.transform_cache_to_data_object(cache, 'file', 'a0' * 64),
[
{
'package': 'pkg0',
@ -182,7 +182,7 @@ Found dependencies: 3
# Test a cache without corresponding file_checksum == id.
cache = {
'id1': [
'a1' * 64: [
{
'p': 'pkg0',
's': ['short'],
@ -194,7 +194,7 @@ Found dependencies: 3
m = unittest.mock.Mock(return_value=True)
with patch('licheck.api.check_cache_structure', m):
self.assertEqual(
api.transform_cache_to_data_object(cache, 'file', 'id0'),
api.transform_cache_to_data_object(cache, 'file', 'a0' * 64),
[
]
)
@ -209,31 +209,36 @@ Found dependencies: 3
def test_read_yaml_file(self):
r"""Test read_yaml_file."""
@unittest.skip("empty test")
def test_write_cache(self):
r"""Test write_cache."""
r"""Test write_yaml_file."""
def test_save_cache(self):
r"""Test save_cache."""
def test_create_cache_output(self):
r"""Test create_cache_output."""
# Test a valid structure.
m = unittest.mock.mock_open()
with patch('builtins.open', m):
pkgs = [
{
'package': 'pkg0',
'license_short': ['short'],
'license_long': ['long'],
'file': 'file',
'version': 'ver0',
},
{
'package': 'pkg1',
'license_short': ['short'],
'license_long': ['long'],
'file': 'file',
'version': 'ver1',
},
]
api.write_cache(pkgs, '0a0a', 'hey.txt')
handle = m()
handle.write.assert_called_once_with(yaml.dump({
'0a0a':
packages = [
{
'package': 'pkg0',
'license_short': ['short'],
'license_long': ['long'],
'file': 'file',
'version': 'ver0',
},
{
'package': 'pkg1',
'license_short': ['short'],
'license_long': ['long'],
'file': 'file',
'version': 'ver1',
},
]
file_checksum = '0a' * 64
table = dict()
expected_table = {
'0a' * 64:
[
{
'p': 'pkg0',
@ -249,13 +254,18 @@ Found dependencies: 3
},
],
}
))
api.create_cache_output(packages, file_checksum, table)
self.assertEqual(table, expected_table)
# Test if invalid common data structure function is called.
m = unittest.mock.Mock(return_value=False)
with patch('licheck.api.check_data_object_structure', m):
with self.assertRaises(exceptions.InvalidCommonDataStructure):
api.check_licenses(list(), list())
api.create_cache_output(packages, file_checksum, table)
m = unittest.mock.Mock(return_value=False)
with patch('licheck.api.is_sha512_checksum', m):
with self.assertRaises(exceptions.NotAChecksum):
api.create_cache_output(packages, file_checksum, table)
def test_check_licenses(self):
r"""Test check_licenses."""

Loading…
Cancel
Save