-
Notifications
You must be signed in to change notification settings - Fork 43
Expand file tree
/
Copy pathconftest.py
More file actions
365 lines (294 loc) · 12 KB
/
conftest.py
File metadata and controls
365 lines (294 loc) · 12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
from datetime import datetime
from functools import wraps
from re import match
from urllib.parse import urlparse
import pytest
from flask import url_for
from flask_login import current_user
from tracker import advisory
from tracker import create_app
from tracker import db as flask_db
from tracker.advisory import advisory_get_label
from tracker.model.advisory import Advisory
from tracker.model.advisory import advisory_regex
from tracker.model.cve import CVE
from tracker.model.cve import issue_types
from tracker.model.cvegroup import CVEGroup
from tracker.model.cvegroupentry import CVEGroupEntry
from tracker.model.cvegrouppackage import CVEGroupPackage
from tracker.model.enum import Affected
from tracker.model.enum import Publication
from tracker.model.enum import Remote
from tracker.model.enum import Severity
from tracker.model.enum import UserRole
from tracker.model.enum import affected_to_status
from tracker.model.enum import highest_severity
from tracker.model.package import Package
from tracker.model.user import User
from tracker.user import hash_password
from tracker.user import random_string
DEFAULT_ADVISORY_ID = advisory_get_label()
DEFAULT_USERNAME = 'cyberwehr12345678'
ERROR_LOGIN_REQUIRED = 'Please log in to access this page.'
ERROR_INVALID_CHOICE = 'Not a valid choice'
@pytest.fixture(scope="session")
def app(request):
flask_app = create_app()
flask_app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
flask_app.config['TESTING'] = True
flask_app.config['WTF_CSRF_ENABLED'] = False
flask_app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
flask_app.config['SERVER_NAME'] = 'cyber.local'
with flask_app.app_context():
yield flask_app
@pytest.fixture(scope="session")
def client(app):
return app.test_client()
@pytest.fixture(scope="session")
def db(app, request):
with app.app_context():
yield flask_db
flask_db.drop_all()
@pytest.fixture(autouse=True, scope='function')
def run_scoped(app, db, client, request):
with app.app_context():
db.create_all()
with client:
yield
db.session.remove()
db.drop_all()
@pytest.fixture(scope='function')
def patch_get(monkeypatch, request):
status_code = 200
text = ''
if hasattr(request, 'param'):
if isinstance(request.param, str):
text = request.param
else:
status_code = request.param
def mocked_get(uri, *args, **kwargs):
nonlocal text, status_code
uri = urlparse(uri)
path = uri.path
if uri.path.startswith('/'):
path = uri.path[1:]
if match(advisory_regex, path):
text = '<PRE>{}\n-------------- next part --------------</PRE>'.format(create_advisory_content(id=path))
return type('MockedReq', (), {'status_code': status_code, 'text': text})()
monkeypatch.setattr(advisory, 'get', mocked_get)
def assert_logged_in(response, status_code=200):
assert status_code == response.status_code
assert b'logout' in response.data
assert b'login' not in response.data
assert current_user.is_authenticated
def assert_not_logged_in(response, status_code=200):
assert status_code == response.status_code
assert b'logout' not in response.data
assert b'login' in response.data
assert not current_user.is_authenticated
def logged_in(func=None, role=UserRole.administrator, username=DEFAULT_USERNAME, password=None):
def decorator(func):
@create_user(role=role, username=username, password=password)
@wraps(func)
def wrapper(db, client, *args, **kwargs):
resp = client.post(url_for('tracker.login'), follow_redirects=True,
data=dict(username=username, password=password if password else username))
assert_logged_in(resp)
func(db=db, client=client, *args, **kwargs)
return wrapper
if not func:
return decorator
return decorator(func)
def create_user(func=None, username=DEFAULT_USERNAME, password=None, role=UserRole.reporter,
email=None, salt=None, active=True, idp_id=None):
def decorator(func):
@wraps(func)
def wrapper(db, *args, **kwargs):
user = User()
user.active = active
user.name = username
user.password = password if password else username
user.role = role
user.email = email if email else '{}@cyber.cyber'.format(username)
user.salt = salt if salt else random_string()
user.password = hash_password(user.password, user.salt)
user.idp_id = idp_id
db.session.add(user)
db.session.commit()
func(db=db, *args, **kwargs)
return wrapper
if not func:
return decorator
return decorator(func)
DEFAULT_ISSUE_ID = 'CVE-2016-1337'
def default_issue_dict(overrides=dict()):
data = dict(cve=DEFAULT_ISSUE_ID, issue_type=issue_types[0], remote=Remote.unknown.name,
severity=Severity.unknown.name, description='', notes='', reference='',
changed=str(datetime.utcfromtimestamp(0)))
data.update(overrides)
return data
def create_issue(func=None, id=DEFAULT_ISSUE_ID, issue_type=issue_types[0], remote=Remote.unknown,
severity=Severity.unknown, description='', notes='', reference='',
changed=datetime.utcfromtimestamp(0), count=1):
def decorator(func):
@wraps(func)
def wrapper(db, *args, **kwargs):
for num in range(1, count + 1):
issue = CVE()
issue.id = id if count <= 1 else '{}{}'.format(id, num)
issue.issue_type = issue_type
issue.remote = remote
issue.severity = severity
issue.description = description
issue.notes = notes
issue.reference = reference
issue.changed = changed
db.session.add(issue)
db.session.commit()
func(db=db, *args, **kwargs)
return wrapper
if not func:
return decorator
return decorator(func)
def create_package(func=None, id=None, name=None, base=None, version='1.0-1', arch='any',
database='core', description='', url=None, filename='somefile-1.0-1-any.tar.xz',
sha256sum='sha256', builddate=0):
def decorator(func):
@wraps(func)
def wrapper(db, *args, **kwargs):
package = Package()
if id:
package.id = id
package.name = name
package.base = base if base else name
package.version = version
package.arch = arch
package.database = database
package.description = description
package.url = url
package.filename = filename
package.sha256sum = sha256sum
package.builddate = builddate
db.session.add(package)
db.session.commit()
func(db=db, *args, **kwargs)
return wrapper
if not func:
return decorator
return decorator(func)
DEFAULT_GROUP_ID = 1
DEFAULT_GROUP_NAME = 'AVG-{}'.format(DEFAULT_GROUP_ID)
def default_group_dict(overrides=dict()):
data = dict(cve=DEFAULT_ISSUE_ID, pkgnames='foopkg', affected='1.0-1', fixed=None,
status=Affected.unknown.name, bug_ticket='', reference='', notes='',
advisory_qualified=True, changed=str(datetime.utcfromtimestamp(0)))
data.update(overrides)
return data
def create_group(func=None, id=None, status=None, severity=None,
affected='1.0-1', fixed=None, bug_ticket='', reference='', notes='',
created=datetime.utcnow(), advisory_qualified=True, issues=[DEFAULT_ISSUE_ID], packages=['foo'],
changed=datetime.utcfromtimestamp(0), count=1):
def decorator(func):
@wraps(func)
def wrapper(db, *args, **kwargs):
issue_objs = []
for issue in issues:
issue_objs.append(db.get_or_create(CVE, id=issue))
max_severity = highest_severity([issue.severity for issue in issue_objs])
for num in range(1, count + 1):
group = CVEGroup()
if id:
group.id = id if count <= 1 else '{}{}'.format(id, num)
group.status = status if status else affected_to_status(Affected.affected, packages[0], fixed)
group.severity = severity if severity else max_severity
group.affected = affected
group.fixed = fixed
group.bug_ticket = bug_ticket
group.reference = reference
group.notes = notes
group.created = created
group.advisory_qualified = advisory_qualified
group.changed = changed
db.session.add(group)
db.session.commit()
for issue in issue_objs:
db.get_or_create(CVEGroupEntry, group=group, cve=issue)
for pkgname in packages:
db.get_or_create(CVEGroupPackage, pkgname=pkgname, group=group)
db.session.commit()
func(db=db, *args, **kwargs)
return wrapper
if not func:
return decorator
return decorator(func)
def create_advisory_content(id=DEFAULT_ADVISORY_ID, group=DEFAULT_GROUP_NAME, pkgname='foo', pkgver='1.1-1', cve='CVE-2012-1337', description='SNAFU', impact='Robots will take over', workaround='Update your machine', references=''):
return f"""Arch Linux Security Advisory {id}
==========================================
Severity: Critical
Date : 2012-12-21
CVE-ID : {cve}
Package : {pkgname}
Type : arbitrary code execution
Remote : Yes
Link : https://security.archlinux.org/{group}
Summary
=======
The package {pkgname} before version {pkgver} is vulnerable to arbitrary
code execution.
Resolution
==========
Upgrade to {pkgver}.
# pacman -Syu "{pkgname}>={pkgver}"
The problem has been fixed upstream in version {pkgver}.
Workaround
==========
{workaround}
Description
===========
{description}
Impact
======
{impact}
References
==========
https://security.archlinux.org/{group}
{references}
"""
DEFAULT_ADVISORY_CONTENT = create_advisory_content()
def default_advisory_dict(overrides=dict()):
data = dict(changed=str(datetime.utcfromtimestamp(0)))
data.update(overrides)
return data
def create_advisory(func=None, id=DEFAULT_ADVISORY_ID, group_package_id=DEFAULT_GROUP_ID, advisory_type=None,
publication=Publication.scheduled, workaround=None, impact=None, content=None, reference=None,
created=datetime.utcnow(), changed=datetime.utcfromtimestamp(0), count=1):
def decorator(func):
@wraps(func)
def wrapper(db, *args, **kwargs):
group_package = CVEGroupPackage.query.filter_by(id=group_package_id).first()
issues = group_package.group.issues
issue_types = list(set([issue.cve.issue_type for issue in issues]))
issue_type = issue_types[0] if len(issue_types) == 1 else 'multiple issues'
for num in range(1, count + 1):
advisory = Advisory()
advisory.id = id if count <= 1 else '{}{}'.format(id, num)
advisory.group_package_id = group_package_id
advisory.advisory_type = advisory_type if advisory_type else issue_type
advisory.publication = publication
advisory.workaround = workaround
advisory.impact = impact
advisory.content = content
advisory.created = created
advisory.changed = changed
advisory.reference = reference
db.session.add(advisory)
db.session.commit()
func(db=db, *args, **kwargs)
return wrapper
if not func:
return decorator
return decorator(func)
def get_advisory(advisory_id=DEFAULT_ADVISORY_ID):
return Advisory.query.get(advisory_id)
def advisory_count():
return len(Advisory.query.all())