Skip to content
2 changes: 2 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
from vulnerabilities.pipelines.v2_importers import pysec_importer as pysec_importer_v2
from vulnerabilities.pipelines.v2_importers import redhat_importer as redhat_importer_v2
from vulnerabilities.pipelines.v2_importers import ruby_importer as ruby_importer_v2
from vulnerabilities.pipelines.v2_importers import ubuntu_osv_importer as ubuntu_osv_importer_v2
from vulnerabilities.pipelines.v2_importers import vulnrichment_importer as vulnrichment_importer_v2
from vulnerabilities.pipelines.v2_importers import xen_importer as xen_importer_v2
from vulnerabilities.utils import create_registry
Expand Down Expand Up @@ -107,6 +108,7 @@
debian_importer_v2.DebianImporterPipeline,
mattermost_importer_v2.MattermostImporterPipeline,
apache_tomcat_v2.ApacheTomcatImporterPipeline,
ubuntu_osv_importer_v2.UbuntuOSVImporterPipeline,
nvd_importer.NVDImporterPipeline,
github_importer.GitHubAPIImporterPipeline,
gitlab_importer.GitLabImporterPipeline,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Generated by Django 4.2.25 on 2026-02-05 10:10

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("vulnerabilities", "0111_alter_advisoryseverity_scoring_system_and_more"),
]

operations = [
migrations.AlterField(
model_name="advisoryseverity",
name="scoring_system",
field=models.CharField(
choices=[
("cvssv2", "CVSSv2 Base Score"),
("cvssv3", "CVSSv3 Base Score"),
("cvssv3.1", "CVSSv3.1 Base Score"),
("cvssv4", "CVSSv4 Base Score"),
("rhbs", "RedHat Bugzilla severity"),
("rhas", "RedHat Aggregate severity"),
("archlinux", "Archlinux Vulnerability Group Severity"),
("cvssv3.1_qr", "CVSSv3.1 Qualitative Severity Rating"),
("generic_textual", "Generic textual severity rating"),
("apache_httpd", "Apache Httpd Severity"),
("apache_tomcat", "Apache Tomcat Severity"),
("epss", "Exploit Prediction Scoring System"),
("ssvc", "Stakeholder-Specific Vulnerability Categorization"),
("openssl", "OpenSSL Severity"),
("ubuntu-priority", "Ubuntu Priority"),
],
help_text="Identifier for the scoring system used. Available choices are: cvssv2: CVSSv2 Base Score,\ncvssv3: CVSSv3 Base Score,\ncvssv3.1: CVSSv3.1 Base Score,\ncvssv4: CVSSv4 Base Score,\nrhbs: RedHat Bugzilla severity,\nrhas: RedHat Aggregate severity,\narchlinux: Archlinux Vulnerability Group Severity,\ncvssv3.1_qr: CVSSv3.1 Qualitative Severity Rating,\ngeneric_textual: Generic textual severity rating,\napache_httpd: Apache Httpd Severity,\napache_tomcat: Apache Tomcat Severity,\nepss: Exploit Prediction Scoring System,\nssvc: Stakeholder-Specific Vulnerability Categorization,\nopenssl: OpenSSL Severity,\nubuntu-priority: Ubuntu Priority ",
max_length=50,
),
),
migrations.AlterField(
model_name="vulnerabilityseverity",
name="scoring_system",
field=models.CharField(
choices=[
("cvssv2", "CVSSv2 Base Score"),
("cvssv3", "CVSSv3 Base Score"),
("cvssv3.1", "CVSSv3.1 Base Score"),
("cvssv4", "CVSSv4 Base Score"),
("rhbs", "RedHat Bugzilla severity"),
("rhas", "RedHat Aggregate severity"),
("archlinux", "Archlinux Vulnerability Group Severity"),
("cvssv3.1_qr", "CVSSv3.1 Qualitative Severity Rating"),
("generic_textual", "Generic textual severity rating"),
("apache_httpd", "Apache Httpd Severity"),
("apache_tomcat", "Apache Tomcat Severity"),
("epss", "Exploit Prediction Scoring System"),
("ssvc", "Stakeholder-Specific Vulnerability Categorization"),
("openssl", "OpenSSL Severity"),
("ubuntu-priority", "Ubuntu Priority"),
],
help_text="Identifier for the scoring system used. Available choices are: cvssv2: CVSSv2 Base Score,\ncvssv3: CVSSv3 Base Score,\ncvssv3.1: CVSSv3.1 Base Score,\ncvssv4: CVSSv4 Base Score,\nrhbs: RedHat Bugzilla severity,\nrhas: RedHat Aggregate severity,\narchlinux: Archlinux Vulnerability Group Severity,\ncvssv3.1_qr: CVSSv3.1 Qualitative Severity Rating,\ngeneric_textual: Generic textual severity rating,\napache_httpd: Apache Httpd Severity,\napache_tomcat: Apache Tomcat Severity,\nepss: Exploit Prediction Scoring System,\nssvc: Stakeholder-Specific Vulnerability Categorization,\nopenssl: OpenSSL Severity,\nubuntu-priority: Ubuntu Priority ",
max_length=50,
),
),
]
9 changes: 8 additions & 1 deletion vulnerabilities/pipelines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,9 @@ class VulnerableCodeBaseImporterPipelineV2(VulnerableCodePipeline):
repo_url = None
ignorable_versions = []

# Control how often progress log is shown (range: 1–100, higher value = less frequent log)
progress_step = 10

# When set to true pipeline is run only once.
# To rerun onetime pipeline reset is_active field to True via migration.
run_once = False
Expand Down Expand Up @@ -301,7 +304,11 @@ def collect_and_store_advisories(self):
if estimated_advisory_count > 0:
self.log(f"Collecting {estimated_advisory_count:,d} advisories")

progress = LoopProgress(total_iterations=estimated_advisory_count, logger=self.log)
progress = LoopProgress(
total_iterations=estimated_advisory_count,
logger=self.log,
progress_step=self.progress_step,
)
for advisory in progress.iter(self.collect_advisories()):
if advisory is None:
self.log("Advisory is None, skipping")
Expand Down
79 changes: 79 additions & 0 deletions vulnerabilities/pipelines/v2_importers/ubuntu_osv_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

from pathlib import Path
from typing import Iterable

from fetchcode.vcs import fetch_via_vcs

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
from vulnerabilities.pipes.osv_v2 import parse_advisory_data_v3
from vulnerabilities.utils import get_advisory_url
from vulnerabilities.utils import load_json


class UbuntuOSVImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
"""
Collect Ubuntu OSV format advisories.

Collect advisories from the GitHub Ubuntu Vulnerability Data repository.
"""

pipeline_id = "ubuntu_osv_importer_v2"
spdx_license_expression = "CC-BY-4.0"
license_url = "https://github.com/canonical/ubuntu-security-notices/blob/main/LICENSE"
repo_url = "git+https://github.com/canonical/ubuntu-security-notices/"

progress_step = 1

@classmethod
def steps(cls):
return (
cls.clone,
cls.collect_and_store_advisories,
cls.clean_downloads,
)

def clone(self):
self.log(f"Cloning `{self.repo_url}`")
self.vcs_response = fetch_via_vcs(self.repo_url)
self.advisories_path = Path(self.vcs_response.dest_dir)

def advisories_count(self):
cve_directory = self.advisories_path / "osv" / "cve"
return sum(1 for _ in cve_directory.rglob("*.json"))

def collect_advisories(self) -> Iterable[AdvisoryData]:
supported_ecosystems = ["deb"]
cve_directory = self.advisories_path / "osv" / "cve"

for file in cve_directory.rglob("*.json"):
advisory_url = get_advisory_url(
file=file,
base_path=self.advisories_path,
url="https://github.com/canonical/ubuntu-security-notices/blob/main/",
)
raw_data = load_json(file)
advisory_text = file.read_text()

yield parse_advisory_data_v3(
raw_data=raw_data,
supported_ecosystems=supported_ecosystems,
advisory_url=advisory_url,
advisory_text=advisory_text,
)

def clean_downloads(self):
if self.vcs_response:
self.log("Removing cloned repository")
self.vcs_response.delete()

def on_failure(self):
self.clean_downloads()
16 changes: 8 additions & 8 deletions vulnerabilities/pipes/advisory.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,14 +360,14 @@ def insert_advisory_v2(
affected_package=affected_pkg,
logger=logger,
)
affected_packages_v2 = [
PackageV2.objects.get_or_create_from_purl(purl=purl)[0]
for purl in package_affected_purls
]
fixed_packages_v2 = [
PackageV2.objects.get_or_create_from_purl(purl=purl)[0]
for purl in package_fixed_purls
]

affected_packages_v2 = PackageV2.objects.bulk_get_or_create_from_purls(
purls=package_affected_purls
)
fixed_packages_v2 = PackageV2.objects.bulk_get_or_create_from_purls(
purls=package_fixed_purls
)

impact.affecting_packages.add(*affected_packages_v2)
impact.fixed_by_packages.add(*fixed_packages_v2)

Expand Down
68 changes: 45 additions & 23 deletions vulnerabilities/pipes/osv_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@
"crates.io": "cargo",
}

OSV_TO_VCIO_SEVERITY_MAP = {
"cvss_v3": "cvssv3.1",
"cvss_v4": "cvssv4",
"ubuntu": "ubuntu-priority",
}


def parse_advisory_data_v3(
raw_data: dict, supported_ecosystems, advisory_url: str, advisory_text: str
Expand All @@ -67,9 +73,10 @@ def parse_advisory_data_v3(
details = raw_data.get("details") or ""
summary = build_description(summary=summary, description=details)
aliases = raw_data.get("aliases") or []
aliases.extend(raw_data.get("upstream", []))

date_published = get_published_date(raw_data=raw_data)
severities = list(get_severities(raw_data=raw_data))
severities = list(get_severities(raw_data=raw_data, url=advisory_url))
references = get_references_v2(raw_data=raw_data)

patches = []
Expand Down Expand Up @@ -236,29 +243,38 @@ def get_published_date(raw_data):
return published and dateparser.parse(date_string=published)


def get_severities(raw_data) -> Iterable[VulnerabilitySeverity]:
"""
Yield VulnerabilitySeverity extracted from a mapping of OSV ``raw_data``
"""
def get_severities(raw_data, url) -> Iterable[VulnerabilitySeverity]:
"""Yield VulnerabilitySeverity extracted from a mapping of OSV ``raw_data``"""
try:
for severity in raw_data.get("severity") or []:
vector = severity.get("score")
valid_vector = vector[:-1] if vector and vector.endswith("/") else vector

if severity.get("type") == "CVSS_V3":
system = SCORING_SYSTEMS["cvssv3.1"]
score = system.compute(valid_vector)
yield VulnerabilitySeverity(system=system, value=score, scoring_elements=vector)

elif severity.get("type") == "CVSS_V4":
system = SCORING_SYSTEMS["cvssv4"]
score = system.compute(valid_vector)
yield VulnerabilitySeverity(system=system, value=score, scoring_elements=vector)

else:
severity_type = severity.get("type")
value = severity.get("score")
severity_type = severity_type.lower()
scoring_element = None

if (
severity_type not in SCORING_SYSTEMS
and severity_type not in OSV_TO_VCIO_SEVERITY_MAP
):
logger.error(
f"Unsupported severity type: {severity!r} for OSV id: {raw_data.get('id')!r}"
)
continue

severity_type = OSV_TO_VCIO_SEVERITY_MAP.get(severity_type, severity_type)
system = SCORING_SYSTEMS[severity_type]

if severity_type in ["cvssv3.1", "cvssv4"]:
scoring_element = value
valid_vector = value[:-1] if value and value.endswith("/") else value
value = system.compute(valid_vector)

yield VulnerabilitySeverity(
system=system,
value=value,
scoring_elements=scoring_element,
url=url,
)
except (CVSS3MalformedError, CVSS4MalformedError) as e:
logger.error(f"Invalid severity {e}")

Expand Down Expand Up @@ -302,10 +318,11 @@ def get_affected_purl(affected_pkg, raw_id):
data and a ``raw_id``.
"""
package = affected_pkg.get("package") or {}
purl = package.get("purl")
if purl:
if purl := package.get("purl"):
try:
purl = PackageURL.from_string(purl)
purl_dict = PackageURL.from_string(purl).to_dict()
del purl_dict["version"]
purl = PackageURL(**purl_dict)
except ValueError:
logger.error(
f"Invalid PackageURL: {purl!r} for OSV "
Expand All @@ -314,12 +331,17 @@ def get_affected_purl(affected_pkg, raw_id):
else:
ecosys = package.get("ecosystem")
name = package.get("name")
namespace = ""

if ecosys and name:
ecosys = ecosys.lower()
purl_type = PURL_TYPE_BY_OSV_ECOSYSTEM.get(ecosys)
if ecosys.startswith("ubuntu"):
purl_type = "deb"
namespace = "ubuntu"

if not purl_type:
return
namespace = ""
if purl_type == "maven":
namespace, _, name = name.partition(":")

Expand Down
14 changes: 14 additions & 0 deletions vulnerabilities/severity_systems.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,19 @@ def get(self, scoring_elements: str) -> dict:
"Low",
]

UBUNTU_PRIORITY = ScoringSystem(
identifier="ubuntu-priority",
name="Ubuntu Priority",
url="https://ubuntu.com/security/cves/about#priority",
)
UBUNTU_PRIORITY.choices = [
"Critical",
"High",
"Medium",
"Low",
"Negligible",
]


@dataclasses.dataclass(order=True)
class EPSSScoringSystem(ScoringSystem):
Expand Down Expand Up @@ -239,5 +252,6 @@ def get(self, scoring_elements: str):
EPSS,
SSVC,
OPENSSL,
UBUNTU_PRIORITY,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def setUp(self):
self.logger = TestLogger()

@patch("vulnerabilities.pipelines.v2_importers.openssl_importer.OpenSSLImporterPipeline.clone")
def test_redhat_advisories_v2(self, mock_clone):
def test_openssl_advisories_v2(self, mock_clone):
mock_clone.__name__ = "clone"
pipeline = OpenSSLImporterPipeline()
pipeline.advisory_path = TEST_DATA
Expand Down
Loading