Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install flake8 pytest
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
- name: Package and install
run: |
tests/package.sh
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
tests/lint.sh
- name: Test with pytest
run: |
pytest
695 changes: 21 additions & 674 deletions LICENSE

Large diffs are not rendered by default.

File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
6 changes: 6 additions & 0 deletions cloud_enum.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/bin/sh

# This is a convenience script for use with a non-installed
# version of cloud_enum, such as one cloned from a git repo.

PYTHONPATH=$PYTHONPATH:$(pwd) python3 cloud_enum/main.py "$@"
Empty file added cloud_enum/__init__.py
Empty file.
113 changes: 113 additions & 0 deletions cloud_enum/arguments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
"""
This module contains the argparser.
"""

import os
import sys
import argparse


def parse_arguments():
"""
Handles user-passed parameters
"""
desc = "Multi-cloud enumeration utility. All hail OSINT!"
parser = argparse.ArgumentParser(description=desc)

# Grab the current dir of the script, for setting some defaults below
script_path = os.path.split(os.path.abspath(sys.argv[0]))[0]

kw_group = parser.add_mutually_exclusive_group(required=True)

# Keyword can given multiple times
kw_group.add_argument('-k', '--keyword', type=str, action='append',
help='Keyword. Can use argument multiple times.')

# OR, a keyword file can be used
kw_group.add_argument('-kf', '--keyfile', type=str, action='store',
help='Input file with a single keyword per line.')

# Use included mutations file by default, or let the user provide one
parser.add_argument('-m', '--mutations', type=str, action='store',
default=script_path + '/fuzz.txt',
help='Mutations. Default: fuzz.txt')

# Use include container brute-force or let the user provide one
parser.add_argument('-b', '--brute', type=str, action='store',
default=script_path + '/fuzz.txt',
help='List to brute-force Azure container names.'
' Default: fuzz.txt')

parser.add_argument('-t', '--threads', type=int, action='store',
default=5, help='Threads for HTTP brute-force.'
' Default = 5')

parser.add_argument('-ns', '--nameserver', type=str, action='store',
default='8.8.8.8',
help='DNS server to use in brute-force.')

parser.add_argument('-l', '--logfile', type=str, action='store',
help='Appends found items to specified file.')
parser.add_argument('-f', '--format', type=str, action='store',
default='text',
help='Format for log file (text,json,csv)'
' - default: text')

parser.add_argument('--disable-aws', action='store_true',
help='Disable Amazon checks.')

parser.add_argument('--disable-azure', action='store_true',
help='Disable Azure checks.')

parser.add_argument('--disable-gcp', action='store_true',
help='Disable Google checks.')

parser.add_argument('-qs', '--quickscan', action='store_true',
help='Disable all mutations and second-level scans')

args = parser.parse_args()

# Ensure mutations file is readable
if not os.access(args.mutations, os.R_OK):
print(f"[!] Cannot access mutations file: {args.mutations}")
sys.exit()

# Ensure brute file is readable
if not os.access(args.brute, os.R_OK):
print("[!] Cannot access brute-force file, exiting")
sys.exit()

# Ensure keywords file is readable
if args.keyfile:
if not os.access(args.keyfile, os.R_OK):
print("[!] Cannot access keyword file, exiting")
sys.exit()

# Parse keywords from input file
with open(args.keyfile, encoding='utf-8') as infile:
args.keyword = [keyword.strip() for keyword in infile]

# Ensure log file is writeable
if args.logfile:
if os.path.isdir(args.logfile):
print("[!] Can't specify a directory as the logfile, exiting.")
sys.exit()
if os.path.isfile(args.logfile):
target = args.logfile
else:
target = os.path.dirname(args.logfile)
if target == '':
target = '.'

if not os.access(target, os.W_OK):
print("[!] Cannot write to log file, exiting")
sys.exit()

# Set up logging format
if args.format not in ('text', 'json', 'csv'):
print("[!] Sorry! Allowed log formats: 'text', 'json', or 'csv'")
sys.exit()
# Set the global in the utils file, where logging needs to happen
# utils.init_logfile(args.logfile, args.format)

return args
119 changes: 119 additions & 0 deletions cloud_enum/cloud_checkers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""
Cloud Checker will contain classes to be re-used in each cloud provider.

The intent is to have a common foundation that can be used for all checks.
"""

from enum import Enum
import requests


class AccessLevel(Enum):
"""
The access level reports how accessible a finding is
"""
PUBLIC = 1
PROTECTED = 2
DISABLED = 3


class Checker:
"""
Contains the base functionality used by both HTTPChecker and DNSChecker
"""
def __init__(self, threads=5):
self.threads = threads
self.targets = set()
self.sigs = []

def add_targets(self, targets):
"""
Targets should be a list of strings in full URL format.
"""
if not isinstance(targets, list):
raise TypeError("Must be a list")
self.targets.update(targets)

def add_sig(self, raw_sig):
"""
Adds a definition of a finding.

This consists of the following:
finding: text describing the finding (string)
access: enum of AccessLevel (1, 2, 3) that can be used to assess
severity
resp_code: for HTTP scraping, the response code (int)
resp_text: for HTTP scraping, the response text (string)
dns: for DNS scraping, set to True (bool)

There might be multiple signatures for a single type of target. For
example, a GCP bucket would have a signature with a resp code of 200
for open buckets and 403 for a protected bucket.
"""
new_sig = dict(
finding=raw_sig.get("finding", None),
access=raw_sig.get("access", None),
resp_code=raw_sig.get("resp_code", None),
resp_text=raw_sig.get("resp_text", None),
dns=raw_sig.get("dns", False)
)

# A signature must have at least an HTTP response code or a DNS check
if not new_sig["dns"] and not new_sig["resp_code"]:
raise ValueError("Must have at least resp_code or dns")

# Type check everything
if not isinstance(new_sig["resp_code"], (int, type(None))):
raise TypeError("Must be a string")
if not isinstance(new_sig["finding"], (str, type(None))):
raise TypeError("Must be a string")
if not isinstance(new_sig["access"], (AccessLevel, type(None))):
raise TypeError("Must be an AccessLevel enum")
if not isinstance(new_sig["resp_text"], (str, type(None))):
raise TypeError("Must be a string")
if not isinstance(new_sig["dns"], (str, bool)):
raise TypeError("Must be a bool")

self.sigs.append(new_sig)


class HTTPChecker(Checker):
"""
Used to perform simple web-scraping, analyzing the results based on
known pattern matches of HTTP response codes and text.
"""

@staticmethod
def check_target(target, sig):
"""
Checks an individual target for a pattern match.

Returns True/False based on the HTTP response and the provided
signature.
"""
try:
resp = requests.get(target)
except requests.exceptions.ConnectionError as error_msg:
print(f" [!] Connection error on {target}:")
print(error_msg)
return False
except TimeoutError:
print(f" [!] Timeout on {target}.")
return False

if resp.status_code == sig["resp_code"]:
if not sig["resp_text"]:
# Simple checks match only the response status code
return True
if sig["resp_text"] in resp.text:
# Some checks also require matching response text
return True

return False


class DNSChecker(Checker):
"""
Used to perform simple DNS brute-forcing, analyzing the results based on
expected query results
"""
File renamed without changes.
18 changes: 18 additions & 0 deletions cloud_enum/gcp_sigs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""
This module contains only variable names that define GCP-related signatures
"""

from cloud_enum import cloud_checkers


gcp_open_bucket = dict(
finding="Open GCP Bucket",
access=cloud_checkers.AccessLevel(1),
resp_code=200
)

gcp_protected_bucket = dict(
finding="Protected GCP Bucket",
access=cloud_checkers.AccessLevel(2),
resp_code=403
)
43 changes: 43 additions & 0 deletions cloud_enum/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
"""
cloud_enum by initstring
https://github.com/initstring/cloud_enum

MIT License

Multi-cloud OSINT tool designed to enumerate storage and services in AWS,
Azure, and GCP.

Please enjoy responsibly.
"""

from cloud_enum import arguments
from cloud_enum import utils


BANNER = '''
##########################
cloud_enum
github.com/initstring
##########################

'''


def main():
args = arguments.parse_arguments()

print(BANNER)

# Generate a basic status on targets and parameters
utils.print_status(args)

# First, build a sorted base list of target names
if args.quickscan:
mutations = []
else:
mutations = utils.read_mutations(args.mutations)
names = utils.build_names(args.keyword, mutations)


if __name__ == "__main__":
main()
72 changes: 72 additions & 0 deletions cloud_enum/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""
This module will contain basic utilities and helper functions
"""

import re


def print_status(args):
"""
Print a short pre-run status message
"""
print(f"Keywords: {', '.join(args.keyword)}")
if args.quickscan:
print("Mutations: NONE! (Using quickscan)")
else:
print(f"Mutations: {args.mutations}")
print(f"Brute-list: {args.brute}")
print("")


def read_mutations(mutations_file):
"""
Read mutations file into memory for processing.
"""
with open(mutations_file, encoding="utf8", errors="ignore") as infile:
mutations = infile.read().splitlines()

print(f"[+] Mutations list imported: {len(mutations)} items")
return mutations


def clean_text(text):
"""
Clean text to be RFC compliant for hostnames / DNS
"""
banned_chars = re.compile('[^a-z0-9.-]')
text_lower = text.lower()
text_clean = banned_chars.sub('', text_lower)

return text_clean


def build_names(base_list, mutations):
"""
Combine base and mutations for processing by individual modules.
"""
names = []

for base in base_list:
# Clean base
base = clean_text(base)

# First, include with no mutations
names.append(base)

for mutation in mutations:
# Clean mutation
mutation = clean_text(mutation)

# Then, do appends
names.append(f"{base}{mutation}")
names.append(f"{base}.{mutation}")
names.append(f"{base}-{mutation}")

# Then, do prepends
names.append(f"{mutation}{base}")
names.append(f"{mutation}.{base}")
names.append(f"{mutation}-{base}")

print(f"[+] Mutated results: {len(names)} items")

return names
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[build-system]
build-backend = "setuptools.build_meta"
requires = ["setuptools", "wheel"]
Loading