Skip to content

Commit

Permalink
Merge pull request #103 from fossology/fix/uploadType
Browse files Browse the repository at this point in the history
Fix/upload type required by API version 1.5.1
  • Loading branch information
deveaud-m authored Mar 22, 2023
2 parents edb3f71 + 3b19555 commit d6b1b38
Show file tree
Hide file tree
Showing 13 changed files with 1,155 additions and 786 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/fossologytests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ on:

jobs:
test-latest:
name: Integration Tests (latest Fossology)
name: Integration Tests (latest Fossology - 4.2.1)
runs-on: ubuntu-latest

container:
Expand All @@ -20,7 +20,7 @@ jobs:

services:
fossology:
image: fossology/fossology:latest
image: fossology/fossology:4.2.1
ports:
- 8081:80
volumes:
Expand Down
238 changes: 5 additions & 233 deletions fossology/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,73 +4,24 @@
import logging
import re
from datetime import date, timedelta
from typing import Dict, List

import requests

from fossology.exceptions import (
AuthenticationError,
AuthorizationError,
FossologyApiError,
FossologyUnsupported,
)
from fossology.exceptions import AuthenticationError, FossologyApiError
from fossology.folders import Folders
from fossology.groups import Groups
from fossology.jobs import Jobs
from fossology.license import LicenseEndpoint
from fossology.obj import (
Agents,
ApiInfo,
File,
HealthInfo,
SearchTypes,
TokenScope,
Upload,
User,
get_options,
)
from fossology.obj import Agents, ApiInfo, HealthInfo, TokenScope, User, versiontuple
from fossology.report import Report
from fossology.search import Search
from fossology.uploads import Uploads
from fossology.users import Users

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


def versiontuple(v):
return tuple(map(int, (v.split("."))))


def search_headers(
searchType: SearchTypes = SearchTypes.ALLFILES,
upload: Upload = None,
filename: str = None,
tag: str = None,
filesizemin: int = None,
filesizemax: int = None,
license: str = None,
copyright: str = None,
group: str = None,
) -> Dict:
headers = {"searchType": searchType.value}
if upload:
headers["uploadId"] = str(upload.id)
if filename:
headers["filename"] = filename
if tag:
headers["tag"] = tag
if filesizemin:
headers["filesizemin"] = filesizemin
if filesizemax:
headers["filesizemax"] = filesizemax
if license:
headers["license"] = license
if copyright:
headers["copyright"] = copyright
if group:
headers["groupName"] = group
return headers


def fossology_token(
url, username, password, token_name, token_scope=TokenScope.READ, token_expire=None
):
Expand Down Expand Up @@ -128,7 +79,7 @@ def fossology_token(
exit(f"Server {url} does not seem to be running or is unreachable: {error}")


class Fossology(Folders, Groups, LicenseEndpoint, Uploads, Jobs, Report):
class Fossology(Folders, Groups, LicenseEndpoint, Uploads, Jobs, Report, Users, Search):

"""Main Fossology API class
Expand Down Expand Up @@ -257,182 +208,3 @@ def get_health(self) -> ApiInfo:
else:
description = "Error while getting health info"
raise FossologyApiError(description, response)

def detail_user(self, user_id):
"""Get details of Fossology user.
API Endpoint: GET /users/{id}
:param id: the ID of the user
:type id: int
:return: the requested user's details
:rtype: User
:raises FossologyApiError: if the REST call failed
"""
response = self.session.get(f"{self.api}/users/{user_id}")
if response.status_code == 200:
user_agents = None
user_details = response.json()
if user_details.get("agents"):
user_agents = Agents.from_json(user_details["agents"])
user = User.from_json(user_details)
user.agents = user_agents
return user
else:
description = f"Error while getting details for user {user_id}"
raise FossologyApiError(description, response)

def list_users(self):
"""List all users from the Fossology instance
API Endpoint: GET /users
:return: the list of users
:rtype: list of User
:raises FossologyApiError: if the REST call failed
"""
response = self.session.get(f"{self.api}/users")
if response.status_code == 200:
users_list = list()
for user in response.json():
if user.get("name") == "Default User":
continue
if user.get("email"):
foss_user = User.from_json(user)
agents = user.get("agents")
if agents:
foss_user.agents = Agents.from_json(agents)
users_list.append(foss_user)
return users_list
else:
description = f"Unable to get a list of users from {self.host}"
raise FossologyApiError(description, response)

def delete_user(self, user):
"""Delete a Fossology user.
API Endpoint: DELETE /users/{id}
:param user: the user to be deleted
:type user: User
:raises FossologyApiError: if the REST call failed
"""
response = self.session.delete(f"{self.api}/users/{user.id}")

if response.status_code == 202:
return
else:
description = f"Error while deleting user {user.name} ({user.id})"
raise FossologyApiError(description, response)

def search(
self,
searchType: SearchTypes = SearchTypes.ALLFILES,
upload: Upload = None,
filename: str = None,
tag: str = None,
filesizemin: int = None,
filesizemax: int = None,
license: str = None,
copyright: str = None,
group: str = None,
):
"""Search for a specific file
API Endpoint: GET /search
:param searchType: Limit search to: directory, allfiles (default), containers
:param upload: Limit search to a specific upload
:param filename: Filename to find, can contain % as wild-card
:param tag: tag to find
:param filesizemin: Min filesize in bytes
:param filesizemax: Max filesize in bytes
:param license: License search filter
:param copyright: Copyright search filter
:param group: the group name to choose while performing search (default: None)
:type searchType: one of SearchTypes Enum
:type upload: Upload
:type filename: string
:type tag: string
:type filesizemin: int
:type filesizemax: int
:type license: string
:type copyright: string
:type group: string
:return: list of items corresponding to the search criteria
:rtype: JSON
:raises FossologyApiError: if the REST call failed
:raises AuthorizationError: if the user can't access the group
"""
headers = search_headers(
searchType,
upload,
filename,
tag,
filesizemin,
filesizemax,
license,
copyright,
group,
)
response = self.session.get(f"{self.api}/search", headers=headers)

if response.status_code == 200:
return response.json()

elif response.status_code == 403:
description = f"Searching {get_options(group)}not authorized"
raise AuthorizationError(description, response)

else:
description = "Unable to get a result with the given search criteria"
raise FossologyApiError(description, response)

def filesearch(
self,
filelist: List = [],
group: str = None,
):
"""Search for files from hash sum
API Endpoint: POST /filesearch
The response does not generate Python objects yet, the plain JSON data is simply returned.
:param filelist: the list of files (or containers) hashes to search for (default: [])
:param group: the group name to choose while performing search (default: None)
:type filelist: list
:return: list of items corresponding to the search criteria
:type group: string
:rtype: JSON
:raises FossologyApiError: if the REST call failed
:raises AuthorizationError: if the user can't access the group
"""
if versiontuple(self.version) <= versiontuple("1.0.16"):
description = f"Endpoint /filesearch is not supported by your Fossology API version {self.version}"
raise FossologyUnsupported(description)

headers = {}
if group:
headers["groupName"] = group

response = self.session.post(
f"{self.api}/filesearch", headers=headers, json=filelist
)

if response.status_code == 200:
all_files = []
for hash_file in response.json():
if hash_file.get("findings"):
all_files.append(File.from_json(hash_file))
else:
return "Unable to get a result with the given filesearch criteria"
return all_files

elif response.status_code == 403:
description = f"Searching {get_options(group)}not authorized"
raise AuthorizationError(description, response)

else:
description = "Unable to get a result with the given filesearch criteria"
raise FossologyApiError(description, response)
62 changes: 61 additions & 1 deletion fossology/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import fossology
from fossology.exceptions import FossologyApiError, FossologyUnsupported
from fossology.obj import Group
from fossology.obj import Group, MemberPerm, UserGroupMember

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
Expand Down Expand Up @@ -40,6 +40,31 @@ def list_groups(self) -> List:
description = f"Unable to get a list of groups for {self.user.name}"
raise FossologyApiError(description, response)

def list_group_members(self, group_id: int) -> List:
"""Get the list of members for a given group (accessible groups for user, all groups for admin)
API Endpoint: GET /groups/{id}/members
:return: a list of members
:rtype: list()
:raises FossologyApiError: if the REST call failed
"""
if fossology.versiontuple(self.version) < fossology.versiontuple("1.5.0"):
description = f"Endpoint /groups/id/members is not supported by your Fossology API version {self.version}"
raise FossologyUnsupported(description)

response = self.session.get(f"{self.api}/groups/{group_id}/members")
if response.status_code == 200:
members_list = []
response_list = response.json()
for member in response_list:
single_member = UserGroupMember.from_json(member)
members_list.append(single_member)
return members_list
else:
description = f"Unable to get a list of members for group {group_id}"
raise FossologyApiError(description, response)

def create_group(self, name):
"""Create a group
Expand All @@ -62,3 +87,38 @@ def create_group(self, name):
else:
description = f"Group {name} already exists, failed to create group or no group name provided"
raise FossologyApiError(description, response)

def add_group_member(
self, group_id: int, user_id: int, perm: MemberPerm = MemberPerm.USER
):
"""Add a user to a group
API Endpoint: POST /groups/{group_id}/user/{user_id}
:param group_id: the id of the group
:param user_id: the id of the user
:param perm: the permission level for the user
:type group_id: int
:type user_id: int
:type perm: MemberPerm
:raises FossologyApiError: if the REST call failed
"""
if fossology.versiontuple(self.version) < fossology.versiontuple("1.5.0"):
description = f"Endpoint /groups/id/user/id is not supported by your Fossology API version {self.version}"
raise FossologyUnsupported(description)

data = dict()
data["perm"] = perm.value
response = self.session.post(
f"{self.api}/groups/{group_id}/user/{user_id}", json=data
)
if response.status_code == 200:
logger.info(f"User {user_id} has been added to group {group_id}.")
elif response.status_code == 400:
logger.info(f"User {user_id} is already a member of group {group_id}.")
else:
description = (
f"An error occurred while adding user {user_id} to group {group_id}"
)
raise FossologyApiError(description, response)
return
Loading

0 comments on commit d6b1b38

Please sign in to comment.