-
Notifications
You must be signed in to change notification settings - Fork 505
/
tasks.py
executable file
·124 lines (101 loc) · 3.55 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#!/usr/bin/env python
import sys
import os
import logging
from invoke import task, Collection
BIN = os.path.abspath(os.path.join(os.path.dirname(__file__), "endgame", "bin", "cli.py"))
sys.path.append(
os.path.abspath(
os.path.join(os.path.dirname(__file__), os.path.pardir, "endgame")
)
)
logger = logging.getLogger(__name__)
# services that we will expose in these tests
EXPOSE_SERVICES = [
"iam",
"ecr",
# "secretsmanager",
"lambda"
]
# services to run the list-resources command against
LIST_SERVICES = [
"iam",
"lambda",
"ecr",
"efs",
"secretsmanager",
"s3"
]
EVIL_PRINCIPAL = os.getenv("EVIL_PRINCIPAL")
if not os.getenv("EVIL_PRINCIPAL"):
raise Exception("Please set the EVIL_PRINCIPAL environment variable to the ARN of the rogue principal that you "
"want to give access to.")
# Create the necessary collections (namespaces)
ns = Collection()
test = Collection("test")
ns.add_collection(test)
# def exception_handler(func):
# def inner_function(*args, **kwargs):
# try:
# func(*args, **kwargs)
# except UnexpectedExit as u_e:
# logger.critical(f"FAIL! UnexpectedExit: {u_e}")
# sys.exit(1)
# except Failure as f_e:
# logger.critical(f"FAIL: Failure: {f_e}")
# sys.exit(1)
#
# return inner_function
# BUILD
@task
def build_package(c):
"""Build the policy_sentry package from the current directory contents for use with PyPi"""
c.run('python -m pip install --upgrade setuptools wheel')
c.run('python setup.py -q sdist bdist_wheel')
@task(pre=[build_package])
def install_package(c):
"""Install the package built from the current directory contents (not PyPi)"""
c.run('pip3 install -q dist/endgame-*.tar.gz')
@task
def create_terraform(c):
c.run("make terraform-demo")
@task
def destroy_terraform(c):
c.run("make terraform-destroy")
# @exception_handler
# @task(pre=[create_terraform], post=[destroy_terraform])
# @task
@task(pre=[install_package])
def list_resources(c):
for service in LIST_SERVICES:
c.run(f"echo '\nListing {service}'", pty=True)
# @exception_handler
# @task(pre=[create_terraform], post=[destroy_terraform])
@task
def expose_dry_run(c):
"""DRY RUN"""
for service in EXPOSE_SERVICES:
c.run(f"{BIN} expose --service {service} --name test-resource-exposure --dry-run", pty=True)
# @exception_handler
# @task(pre=[create_terraform], post=[destroy_terraform])
@task
def expose_undo(c):
"""Test the undo capability, even though we will destroy it after anyway (just to test the capability)"""
c.run(f"echo 'Exposing the Terraform infrastructure to {EVIL_PRINCIPAL}'")
for service in EXPOSE_SERVICES:
c.run(f"{BIN} expose --service {service} --name test-resource-exposure ", pty=True)
c.run(f"echo 'Undoing the exposure to {EVIL_PRINCIPAL} before destroying, just to be extra sure and to test "
f"it out.'")
c.run(f"{BIN} expose --service {service} --name test-resource-exposure --undo", pty=True)
# @exception_handler
# @task(pre=[create_terraform], post=[destroy_terraform])
@task
def expose(c):
"""REAL EXPOSURE TO ROGUE ACCOUNT"""
for service in EXPOSE_SERVICES:
c.run(f"echo 'Exposing the Terraform infrastructure to {EVIL_PRINCIPAL}'")
c.run(f"{BIN} expose --service {service} --name test-resource-exposure", pty=True)
test.add_task(list_resources, "list-resources")
test.add_task(expose_dry_run, "expose-dry-run")
test.add_task(expose_undo, "expose-undo")
test.add_task(expose, "expose")