forked from YichenQiu/deepdesign.space
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathimages_cleanup.py
executable file
·332 lines (260 loc) · 13.6 KB
/
images_cleanup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
import cvlib # high level module, uses YOLO model with the find_common_objects method
import cv2 # image/video manipulation, allows us to pass frames to cvlib
from argparse import ArgumentParser
import os
import sys
from datetime import datetime
#import smtplib, ssl # for sending email alerts
import imghdr
#from imutils.object_detection import non_max_suppression
import numpy as np
import math
# these will need to be fleshed out to not miss any formats
IMG_EXTENSIONS = ['.jpg', '.jpeg', '.png', '.gif', '.JPEG', '.JPG','.PNG','.WEBP']
# used to make sure we are at least examining one valid file
VALID_FILE_ALERT = False
net = cv2.dnn.readNet("frozen_east_text_detection.pb")
def decode(scores, geometry, scoreThresh):
detections = []
confidences = []
############ CHECK DIMENSIONS AND SHAPES OF geometry AND scores ############
assert len(scores.shape) == 4, "Incorrect dimensions of scores"
assert len(geometry.shape) == 4, "Incorrect dimensions of geometry"
assert scores.shape[0] == 1, "Invalid dimensions of scores"
assert geometry.shape[0] == 1, "Invalid dimensions of geometry"
assert scores.shape[1] == 1, "Invalid dimensions of scores"
assert geometry.shape[1] == 5, "Invalid dimensions of geometry"
assert scores.shape[2] == geometry.shape[2], "Invalid dimensions of scores and geometry"
assert scores.shape[3] == geometry.shape[3], "Invalid dimensions of scores and geometry"
height = scores.shape[2]
width = scores.shape[3]
for y in range(0, height):
# Extract data from scores
scoresData = scores[0][0][y]
x0_data = geometry[0][0][y]
x1_data = geometry[0][1][y]
x2_data = geometry[0][2][y]
x3_data = geometry[0][3][y]
anglesData = geometry[0][4][y]
for x in range(0, width):
score = scoresData[x]
# If score is lower than threshold score, move to next x
if(score < scoreThresh):
continue
# Calculate offset
offsetX = x * 4.0
offsetY = y * 4.0
angle = anglesData[x]
# Calculate cos and sin of angle
cosA = math.cos(angle)
sinA = math.sin(angle)
h = x0_data[x] + x2_data[x]
w = x1_data[x] + x3_data[x]
# Calculate offset
offset = ([offsetX + cosA * x1_data[x] + sinA * x2_data[x], offsetY - sinA * x1_data[x] + cosA * x2_data[x]])
# Find points for rectangle
p1 = (-sinA * h + offset[0], -cosA * h + offset[1])
p3 = (-cosA * w + offset[0], sinA * w + offset[1])
center = (0.5*(p1[0]+p3[0]), 0.5*(p1[1]+p3[1]))
detections.append((center, (w,h), -1*angle * 180.0 / math.pi))
confidences.append(float(score))
# Return detections and confidences
return [detections, confidences]
# function takes a file name(full path), checks that file for human shaped objects
# saves the frames with people detected into directory named 'save_directory'
def humanChecker(video_file_name, save_directory, yolo='yolov4', continuous=False, nth_frame=10, confidence=.65, gpu=True):
# for modifying our global variarble VALID_FILE
global VALID_FILE_ALERT
# tracking if we've found a human or not
is_human_found = False
analyze_error = False
is_valid = False
# we'll need to increment every time a person is detected for file naming
person_detection_counter = 0
# check if image
if os.path.splitext(video_file_name)[1] in IMG_EXTENSIONS:
frame = cv2.imread(video_file_name) # our frame will just be the image
#make sure it's a valid image
if frame is not None:
frame_count = 8 # this is necessary so our for loop runs below
nth_frame = 1
VALID_FILE_ALERT = True
is_valid = True
#print(f'Image')
else:
is_valid = False
analyze_error = True
else:
print(f'', end='')
if is_valid:
# look at every nth_frame of our video file, run frame through detect_common_objects
# Increase 'nth_frame' to examine fewer frames and increase speed. Might reduce accuracy though.
# Note: we can't use frame_count by itself because it's an approximation and could lead to errors
for frame_number in range(1, frame_count - 6, nth_frame):
# feed our frame (or image) in to detect_common_objects
try:
bbox, labels, conf = cvlib.detect_common_objects(frame, model=yolo, confidence=confidence, enable_gpu=gpu)
except Exception as e:
print(e)
analyze_error = True
break
if 'person' in labels:
person_detection_counter += 1
is_human_found = True
# create image with bboxes showing people and then save
marked_frame = cvlib.object_detection.draw_bbox(frame, bbox, labels, conf, write_conf=True)
save_file_name = os.path.basename(os.path.splitext(video_file_name)[0]) + '-human-' + str(person_detection_counter) + '.jpeg'
cv2.imwrite(save_directory + '\\\\' + save_file_name , marked_frame)
if continuous is False:
break
return is_human_found, analyze_error
def textChecker(video_file_name, save_directory, continuous=False, nth_frame=10, confidence=.65, gpu=False):
# we'll need to increment every time a person is detected for file naming
text_detection_counter = 0
is_text_found=False
if os.path.splitext(video_file_name)[1] in IMG_EXTENSIONS:
image = cv2.imread(video_file_name)
orig = image.copy()
(H, W) = image.shape[:2]
# set the new width and height and then determine the ratio in change
# for both the width and height
(newW, newH) = (320, 320)
rW = W / float(newW)
rH = H / float(newH)
# resize the image and grab the new image dimensions
image = cv2.resize(image, (newW, newH))
(H, W) = image.shape[:2]
# define the two output layer names for the EAST detector model that
# we are interested -- the first is the output probabilities and the
# second can be used to derive the bounding box coordinates of text
layerNames = [
"feature_fusion/Conv_7/Sigmoid",
"feature_fusion/concat_3"]
# construct a blob from the image and then perform a forward pass of
# the model to obtain the two output layer sets
blob = cv2.dnn.blobFromImage(image, 1.0, (W, H), (123.68, 116.78, 103.94), swapRB=True, crop=False)
net.setInput(blob)
(scores, geometry) = net.forward(layerNames)
# grab the number of rows and columns from the scores volume, then
# initialize our set of bounding box rectangles and corresponding
# confidence scores
(numRows, numCols) = scores.shape[2:4]
rects = []
confidences = []
[boxes, confidences] = decode(scores, geometry, 0.9999)
indices = cv2.dnn.NMSBoxesRotated(boxes, confidences, 0.9999, 0.9999)
if len(indices)>0:
text_detection_counter += 1
is_text_found=True
# show the output image
save_file_name = os.path.basename(os.path.splitext(video_file_name)[0]) + '-text-' + str(text_detection_counter) + '.jpeg'
cv2.imwrite(save_directory + '\\\\' + save_file_name , orig)
return is_text_found
def dimChecker(video_file_name, save_directory, continuous=False, nth_frame=10, confidence=.65, gpu=False):
# we'll need to increment every time a person is detected for file naming
small_detection_counter = 0
is_small_found=False
if os.path.splitext(video_file_name)[1] in IMG_EXTENSIONS:
image = cv2.imread(video_file_name)
orig = image.copy()
(H, W) = image.shape[:2]
if min(H, W)<300:
small_detection_counter += 1
is_small_found=True
# show the output image
save_file_name = os.path.basename(os.path.splitext(video_file_name)[0]) + '-small-' + str(small_detection_counter) + '.jpeg'
cv2.imwrite(save_directory + '\\\\' + save_file_name , orig)
else:
small_detection_counter += 1
is_small_found=True
return is_small_found
# takes a directory and returns all files and directories within
def getListOfFiles(dir_name):
list_of_files = os.listdir(dir_name)
all_files = list()
# Iterate over all the entries
for entry in list_of_files:
# ignore hidden files and directories
if entry[0] != '.':
# Create full path
full_path = os.path.join(dir_name+os.sep+os.sep, entry)
# If entry is a directory then get the list of files in this directory
if os.path.isdir(full_path):
all_files = all_files + getListOfFiles(full_path)
else:
all_files.append((dir_name,full_path))
return all_files
#############################################################################################################################
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument('-d', '--directory', default='', help='Path to video folder')
parser.add_argument('-f', default='', help='Used to select an individual file')
parser.add_argument('--tiny_yolo', action='store_true', help='Flag to indicate using YoloV4-tiny model instead of the full one. Will be faster but less accurate.')
parser.add_argument('--confidence', type=int, choices=range(1,100), default=65, help='Input a value between 1-99. This represents the percent confidence you require for a hit. Default is 65')
parser.add_argument('--gpu', action='store_true', help='Attempt to run on GPU instead of CPU. Requires Open CV compiled with CUDA enables and Nvidia drivers set up correctly.')
args = vars(parser.parse_args())
# decide which model we'll use, default is 'yolov3', more accurate but takes longer
if args['tiny_yolo']:
yolo_string = 'yolov4-tiny'
else:
yolo_string = 'yolov4'
#check our inputs, can only use either -f or -d but must use one
if args['f'] == '' and args['directory'] == '':
print('You must select either a directory with -d <directory> or a file with -f <file name>')
sys.exit(1)
if args['f'] != '' and args['directory'] != '' :
print('Must select either -f or -d but can''t do both')
sys.exit(1)
confidence_percent = args['confidence'] / 100
gpu_flag = False
if args['gpu']:
gpu_flag = True
# create a directory to hold snapshots and log file
time_stamp = datetime.now().strftime('%m%d%Y-%H-%M-%S')
os.mkdir(time_stamp)
print('Beginning Detection')
print(f'Directory {time_stamp} has been created')
print(f"GPU is set to {args['gpu']}")
print('\n\n')
print(datetime.now().strftime('%m%d%Y-%H:%M:%S'))
# open a log file and loop over all our video files
with open(time_stamp + '\\' + time_stamp +'.txt', 'w') as log_file:
if args['f'] == '':
video_directory_list = getListOfFiles(args['directory'] + '\\\\')
else:
video_directory_list = [args['f']]
# what video we are on
working_on_counter = 1
path_tuple = ("humans","small","texts")
for (file_path,video_file) in video_directory_list:
if not file_path.endswith(path_tuple):
print(f'', end='')
if not os.path.exists(file_path+os.sep+os.sep+'humans'):
os.mkdir(file_path+os.sep+os.sep+'humans')
os.mkdir(file_path+os.sep+os.sep+'small')
os.mkdir(file_path+os.sep+os.sep+'texts')
# check for people
human_detected, error_detected = humanChecker(str(video_file), file_path+os.sep+os.sep+'humans', yolo=yolo_string, confidence=confidence_percent, gpu=gpu_flag)
if human_detected:
HUMAN_DETECTED_ALERT = True
print(f'Human detected in {video_file}')
log_file.write(f'{video_file} \n' )
os.remove(f'{video_file}')
elif dimChecker(str(video_file), file_path+os.sep+os.sep+'small', confidence=confidence_percent, gpu=gpu_flag):
SMALL_DETECTED_ALERT = True
print(f'Small dimensions detected in {video_file}')
log_file.write(f'{video_file} \n' )
os.remove(f'{video_file}')
elif textChecker(str(video_file), file_path+os.sep+os.sep+'texts', confidence=confidence_percent, gpu=gpu_flag):
TEXT_DETECTED_ALERT = True
print(f'Text detected in {video_file}')
log_file.write(f'{video_file} \n' )
os.remove(f'{video_file}')
if error_detected:
ERROR_ALERT = True
print(f'\nError in analyzing {video_file}')
log_file.write(f'Error in analyzing {video_file} \n' )
working_on_counter += 1
if VALID_FILE_ALERT is False:
print('No valid image or video files were examined')
print(datetime.now().strftime('%m%d%Y-%H:%M:%S'))