Automatically check the licenses of package dependencies.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
licheck/licheck/api.py

413 lines
13 KiB

#
# api.py
#
# Copyright (C) 2021 frnmst (Franco Masotti) <franco.masotti@live.com>
#
# This file is part of licheck.
#
# licheck is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# licheck is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with licheck. If not, see <http://www.gnu.org/licenses/>.
#
import hashlib
import json
import logging
import pathlib
import re
import shlex
import shutil
import subprocess
from urllib.parse import urlparse
import requests
import yaml
from appdirs import AppDirs
from tabulate import tabulate
from .constants import common_defaults
from .exceptions import (BinaryDoesNotExist, InvalidCache,
InvalidCommonDataStructure, InvalidConfiguration,
InvalidOutput)
def build_command(binary: str, license_program: str, file: str) -> str:
r"""Build a command to let the license programs discover all packages."""
command = str()
if license_program == 'dep_license':
output_format = 'json'
command = binary + ' --dev --format ' + output_format + ' ' + file
return command
def create_common_data_structure(input: list, license_program: str, file: str) -> list:
r"""Create a data structure common to all outputs."""
output = list()
if license_program == 'dep_license':
for element in range(0, len(input)):
output.append(element)
output[element] = dict()
output[element]['package'] = input[element]['Name']
# This element should be a list of licenses.
output[element]['license_short'] = [input[element]['Meta']]
# This element should be a list of licenses.
output[element]['license_long'] = [input[element]['Classifier']]
# Add the file name.
output[element]['file'] = file
output[element]['version'] = str()
return output
def get_data(command: str, license_program: str, file: str) -> dict:
r"""Return an object with the output of the licenses."""
output = str()
p = subprocess.run(
shlex.split(command),
capture_output=True,
check=True
)
output = p.stdout.decode('UTF-8').strip()
data = dict()
if license_program == 'dep_license':
# Output must conform.
if not re.match('(Found dependencies: \d+|no dependencies found)', output):
raise InvalidOutput
# Sanitize output so json can be loaded without problems.
output = re.sub('Found dependencies: \d+', '', output)
output = re.sub('no dependencies found', '{}', output)
data = json.loads(output)
return data
def transform_cache_to_data(cache: dict, file: str, file_checksum: str) -> list:
if not check_cache_structure(cache):
raise InvalidCache
i = 0
output = list()
for file_id in cache:
if file_checksum == file_id:
x = 0
while x < len(cache[file_id]):
output.append(dict())
# Required.
output[i]['package'] = cache[file_id][x]['p']
# This element should be a list of licenses.
output[i]['license_short'] = cache[file_id][x]['s']
# This element should be a list of licenses.
# Required.
output[i]['license_long'] = cache[file_id][x]['l']
# Add the file name.
# Required.
output[i]['file'] = file
output[i]['version'] = cache[file_id][x]['v']
x += 1
i += 1
return output
def read_yaml_file(file: str) -> dict:
data = dict()
if pathlib.Path(file).is_file():
data = yaml.load(open(file, 'r'), Loader=yaml.SafeLoader)
return data
def write_cache(packages: list, file_checksum: str, cache_file: str):
if not check_common_data_structure(packages):
raise InvalidCommonDataStructure
table = dict()
id = file_checksum
table[id] = list()
for package in packages:
table[id].append({
'p': package['package'],
's': package['license_short'],
'l': package['license_long'],
'v': package['version'],
})
with open(cache_file, 'a') as f:
f.write(yaml.dump(table))
def check_licenses(packages: list, licenses_allowed: list) -> list:
if not check_common_data_structure(packages):
raise InvalidCommonDataStructure
errors = list()
for package in packages:
for p in package['license_long']:
if p not in licenses_allowed:
errors.append(package)
if len(package['license_long']) == 0:
errors.append(package)
return errors
def print_errors(packages: list, cut_output: bool = False) -> str:
if not check_common_data_structure(packages):
raise InvalidCommonDataStructure
# Re-format the output.
j = 0
while j < len(packages):
i = 0
for x in packages[j]['license_short']:
if cut_output and len(x) > common_defaults['table element max length']:
packages[j]['license_short'][i] = x[:common_defaults['table element max length']] + '...'
i += 1
i = 0
for x in packages[j]['license_long']:
if cut_output and len(x) > common_defaults['table element max length']:
packages[j]['license_long'][i] = x[:common_defaults['table element max length']] + '...'
i += 1
if packages[j]['version'] == str():
packages[j]['version'] = '-'
j += 1
logging.basicConfig(format='WARNING:licheck:\n%(message)s')
logging.warning('unapproved licenses')
logging.warning(tabulate(packages, headers="keys"))
def get_binary_and_program(language: str) -> str:
r"""Get the license binary path and the program name."""
binary = str()
program_name = str()
if language == 'python':
binary = 'deplic'
program_name = 'dep_license'
if shutil.which(binary) is None:
raise BinaryDoesNotExist
else:
print(shutil.which(binary))
return shutil.which(binary), program_name
def check_configuration_structure(data_struct: dict, local: bool = True) -> bool:
ok = False
if local:
if ('language' in data_struct
and isinstance(data_struct['language'], str)
and 'include' in data_struct
and isinstance(data_struct['include'], list)
and 'files to check' in data_struct
and isinstance(data_struct['files to check'], list)
and 'allowed licenses' in data_struct
and isinstance(data_struct['allowed licenses'], list)):
ok = True
for url in data_struct['include']:
if urlparse(url).scheme not in ['http', 'https']:
ok = False
else:
if ('language' in data_struct
and isinstance(data_struct['language'], str)
and 'include' in data_struct
and isinstance(data_struct['include'], list)
and data_struct['include'] == list()
and 'files to check' in data_struct
and isinstance(data_struct['files to check'], list)
and data_struct['files to check'] == list()
and 'allowed licenses' in data_struct
and isinstance(data_struct['allowed licenses'], list)):
ok = True
for l in data_struct['allowed licenses']:
if not isinstance(l, str):
ok = False
return ok
def check_cache_structure(cache: dict) -> bool:
ok = False
for element in cache:
for package in cache[element]:
if ('l' in package
and 'p' in package
and 's' in package
and 'v' in package
and isinstance(package['l'], list)
and isinstance(package['p'], str)
and isinstance(package['s'], list)
and isinstance(package['v'], str)):
ok = True
ok_1 = True
ok_2 = True
if ok:
for element in cache:
for package in cache[element]:
if ok_1:
for license_long in package['l']:
if not isinstance(license_long, str):
ok_1 = False
if ok_1 and ok_2:
for license_short in package['s']:
if not isinstance(license_short, str):
ok_2 = False
if ok and ok_1 and ok_2:
ok = True
else:
ok = False
# An empty cache is a valid cache.
if cache == dict():
ok = True
return ok
def check_common_data_structure(data: list) -> bool:
ok = False
ok_pre = True
for d in data:
if ok_pre and (isinstance(d, dict)
and 'package' in d
and 'license_short' in d
and 'license_long' in d
and 'file' in d
and 'version' in d
and isinstance(d['package'], str)
and isinstance(d['license_short'], list)
and isinstance(d['license_long'], list)
and isinstance(d['file'], str)
and isinstance(d['version'], str)
and len(d['package']) > 0
and len(d['file']) > 0):
ok = True
else:
ok_pre = False
if ok and ok_pre:
ok = True
else:
ok = False
return ok
def read_configuration_file(file: str, local: bool = True) -> tuple:
configuration = read_yaml_file(file)
if not check_configuration_structure(configuration, local):
raise InvalidConfiguration
return (configuration['allowed licenses'],
configuration['files to check'],
configuration['language'],
configuration['include'],
)
def read_remote_files(include_files: list, cache_dir: str) -> list:
allowed_lic = list()
for include in include_files:
checksum = hashlib.sha512(include.encode('UTF-8')).hexdigest()
checksum += '.yml'
full_path = pathlib.Path(cache_dir, checksum)
if not full_path.is_file():
r = requests.get(include)
with open(full_path, 'wb') as f:
f.write(r.content)
allowed_licenses, files_to_check, language, include = read_configuration_file(full_path, local=False)
allowed_lic += allowed_licenses
# Avoid duplicate elements.
return list(set(allowed_lic))
def create_files_data_structure(files_to_check: list) -> dict:
files_struct = dict()
# Compute the file checksum as a means to check
# if to go through the download of the metadata
# the next run.
for f in files_to_check:
full_path_file = str(pathlib.Path(f).absolute())
file_checksum = hashlib.sha512(open(full_path_file, "rb").read()).hexdigest()
files_struct[f] = file_checksum
return files_struct
def pipeline(configuration_file: str,
clear_cache: bool = False,
cut_table_output: bool = False):
dirs = AppDirs('licheck')
# Handle the cache.
cache_dir = dirs.user_cache_dir
if clear_cache:
shutil.rmtree(cache_dir, ignore_errors=True)
pathlib.Path(cache_dir).mkdir(mode=0o700,exist_ok=True)
cache_file = str(pathlib.Path(dirs.user_cache_dir, common_defaults['cache file']))
cache = read_yaml_file(cache_file)
if not check_cache_structure(cache):
raise InvalidCache
# Read the configuration file.
allowed_licenses, files_to_check, language, include = read_configuration_file(configuration_file, local=True)
# Load remote files.
allowed_licenses += read_remote_files(include, cache_dir)
allowed_licenses = set(allowed_licenses)
files_struct = create_files_data_structure(files_to_check)
full_list = list()
out = list()
i = 0
# Go through the files with the package dependencies.
for file in files_struct:
# Load data from cache or call an external program.
if files_struct[file] in cache:
output = transform_cache_to_data(cache, file, files_struct[file])
else:
binary, program = get_binary_and_program(language)
command = build_command(binary, program, file)
data = get_data(command, program, file)
output = create_common_data_structure(data, program, file)
full_list += output
out.append(output)
i += 1
# Remove the cache file since we need to write it again
# with updated data.
if pathlib.Path(cache_file).is_file():
pathlib.Path(cache_file).unlink()
i = 0
for file in files_struct:
write_cache(out[i], files_struct[file], cache_file)
i += 1
errors = check_licenses(full_list, allowed_licenses)
if len(errors) > 0:
print_errors(errors, cut_table_output)
if __name__ == '__main__':
pass