forked from SocketDev/socket-python-cli
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathoutput.py
More file actions
147 lines (123 loc) · 6.33 KB
/
output.py
File metadata and controls
147 lines (123 loc) · 6.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import json
import logging
from pathlib import Path
from typing import Any, Dict, Optional
from .core.messages import Messages
from .core.classes import Diff, Issue
from .config import CliConfig
from socketsecurity.plugins.manager import PluginManager
from socketdev import socketdev
class OutputHandler:
config: CliConfig
logger: logging.Logger
def __init__(self, config: CliConfig, sdk: socketdev):
self.config = config
self.logger = logging.getLogger("socketcli")
def handle_output(self, diff_report: Diff) -> None:
"""Main output handler that determines output format"""
if self.config.enable_json:
self.output_console_json(diff_report, self.config.sbom_file)
elif self.config.enable_sarif:
self.output_console_sarif(diff_report, self.config.sbom_file)
else:
self.output_console_comments(diff_report, self.config.sbom_file)
if self.config.jira_plugin.enabled:
jira_config = {
"enabled": self.config.jira_plugin.enabled,
"levels": self.config.jira_plugin.levels or [],
**(self.config.jira_plugin.config or {})
}
plugin_mgr = PluginManager({"jira": jira_config})
plugin_mgr.send(diff_report, config=self.config)
# Debug Slack webhook configuration when debug is enabled (always show when debug is on)
if self.config.enable_debug:
import os
slack_enabled_env = os.getenv("SOCKET_SLACK_ENABLED", "Not set")
slack_config_env = os.getenv("SOCKET_SLACK_CONFIG_JSON", "Not set")
slack_url = "Not configured"
if self.config.slack_plugin.config and self.config.slack_plugin.config.get("url"):
slack_url = self.config.slack_plugin.config.get("url")
self.logger.debug("=== Slack Webhook Debug Information ===")
self.logger.debug(f"Slack Plugin Enabled: {self.config.slack_plugin.enabled}")
self.logger.debug(f"SOCKET_SLACK_ENABLED environment variable: {slack_enabled_env}")
self.logger.debug(f"SOCKET_SLACK_CONFIG_JSON environment variable: {slack_config_env}")
self.logger.debug(f"Slack Webhook URL: {slack_url}")
self.logger.debug(f"Slack Alert Levels: {self.config.slack_plugin.levels}")
self.logger.debug("=====================================")
if self.config.slack_plugin.enabled:
slack_config = {
"enabled": self.config.slack_plugin.enabled,
"levels": self.config.slack_plugin.levels or [],
**(self.config.slack_plugin.config or {})
}
plugin_mgr = PluginManager({"slack": slack_config})
plugin_mgr.send(diff_report, config=self.config)
self.save_sbom_file(diff_report, self.config.sbom_file)
def return_exit_code(self, diff_report: Diff) -> int:
if self.config.disable_blocking:
return 0
if not self.report_pass(diff_report):
return 1
# if there are only warn alerts should be returning 0. This was not intended behavior
# if len(diff_report.new_alerts) > 0:
# # 5 means warning alerts but no blocking alerts
# return 5
return 0
def output_console_comments(self, diff_report: Diff, sbom_file_name: Optional[str] = None) -> None:
"""Outputs formatted console comments"""
if len(diff_report.new_alerts) == 0:
self.logger.info("No issues found")
return
console_security_comment = Messages.create_console_security_alert_table(diff_report)
self.logger.info("Security issues detected by Socket Security:")
self.logger.info(f"Diff Url: {diff_report.diff_url}")
self.logger.info(f"\n{console_security_comment}")
def output_console_json(self, diff_report: Diff, sbom_file_name: Optional[str] = None) -> None:
"""Outputs JSON formatted results"""
console_security_comment = Messages.create_security_comment_json(diff_report)
self.save_sbom_file(diff_report, sbom_file_name)
self.logger.info(json.dumps(console_security_comment))
def output_console_sarif(self, diff_report: Diff, sbom_file_name: Optional[str] = None) -> None:
"""
Generate SARIF output from the diff report and print to console.
"""
if diff_report.id != "NO_DIFF_RAN":
# Generate the SARIF structure using Messages
console_security_comment = Messages.create_security_comment_sarif(diff_report)
self.save_sbom_file(diff_report, sbom_file_name)
# Print the SARIF output to the console in JSON format
print(json.dumps(console_security_comment, indent=2))
def report_pass(self, diff_report: Diff) -> bool:
"""Determines if the report passes security checks"""
if not diff_report.new_alerts:
return True
if self.config.disable_blocking:
return True
return not any(issue.error for issue in diff_report.new_alerts)
def save_sbom_file(self, diff_report: Diff, sbom_file_name: Optional[str] = None) -> None:
"""Saves SBOM file if filename is provided"""
if not sbom_file_name or not diff_report.sbom:
return
sbom_path = Path(sbom_file_name)
sbom_path.parent.mkdir(parents=True, exist_ok=True)
with open(sbom_path, "w") as f:
json.dump(diff_report.sbom, f, indent=2)
def _output_issue(self, issue: Issue) -> None:
"""Helper method to format and output a single issue"""
severity = issue.severity.upper() if issue.severity else "UNKNOWN"
status = "🚫 Blocking" if issue.error else "⚠️ Warning"
self.logger.warning(f"\n{status} - Severity: {severity}")
self.logger.warning(f"Title: {issue.title}")
if issue.description:
self.logger.warning(f"Description: {issue.description}")
if issue.suggestion:
self.logger.warning(f"suggestion: {issue.suggestion}")
def _format_issue(self, issue: Issue) -> Dict[str, Any]:
"""Helper method to format an issue for JSON output"""
return {
"purl": issue.purl,
"title": issue.title,
"description": issue.description,
"severity": issue.severity,
"blocking": issue.error,
}