-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathrun_mongo_server.py
363 lines (298 loc) · 14.7 KB
/
run_mongo_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
import argparse
import os
import json
import subprocess
import sys
from datetime import datetime
# 日志文件
FILE_NAME = 'mongodb-{}.log'.format(datetime.now().strftime('%Y-%m-%d'))
INFO_LEVEL = {'INFO': 'INFO', 'DEBUG': 'DEBUG', 'ERROR': 'ERROR'}
PWD = os.getcwd()
def parser():
"""
脚本参数解析
create:生成配置文件
start: 启动服务
stop: 关闭服务
:return:
"""
argument = argparse.ArgumentParser(prog="run_mongo_server", usage="%(prog)s [options]",
description="TOOL ABOUT MONGODB SHARDING CLUSTER")
argument.add_argument("-f", "--file", action="store", dest="file_path", help="config file path")
sp = argument.add_subparsers()
sp_create = sp.add_parser('create', help='create %(prog)s daemon')
sp_stop = sp.add_parser('stop', help='stop %(prog)s daemon')
sp_start = sp.add_parser('start', help='start %(prog)s daemon')
sp_create.set_defaults(func=create)
sp_stop.set_defaults(func=stop)
sp_start.set_defaults(func=start)
args = argument.parse_args(sys.argv[1:])
args.func(args)
return args
def logging(level, msg):
"""
记录错误信息的函数
:param level: str,日志的级别
:param msg: str,日志信息
:return:
"""
base_dir = os.getcwd()
full_filename = os.path.join(base_dir, FILE_NAME)
command = "echo {0} -- {1} -- {2} >> {3} ".format(INFO_LEVEL[level],
datetime.now().strftime('%Y-%m-%d-%H:%M:%S'), msg,
full_filename)
subprocess.check_call(command, shell=True)
def gen_config(json_config, prefix):
"""
生成配置文件
:param json_config: dict,配置的json格式
:param prefix: str,配置文件前缀
:return:
"""
for item in json_config[prefix]:
dest = os.path.join(PWD, '{0}_{1}_{2}.config'.format(prefix, item["IP"], item["role"]))
src = os.path.join(PWD, '{0}.cfg.sample'.format(prefix))
with open(src, 'r') as sample_file:
with open(dest, 'w+') as config_file:
for line in sample_file:
if line.find('systemLog_path') != -1:
config_file.write(line.replace('systemLog_path', item['systemLog_path']))
elif line.find('storage_dbPath') != -1:
config_file.write(line.replace('storage_dbPath', item['storage_dbPath']))
elif line.find('processManagement_pidFilePath') != -1:
config_file.write(
line.replace('processManagement_pidFilePath', item['processManagement_pidFilePath']))
elif line.find('net_port') != -1:
config_file.write(line.replace('net_port', str(item['net_port'])))
elif line.find('replication_replSetName') != -1:
config_file.write(line.replace('replication_replSetName', item['replication_replSetName']))
elif line.find('sharding_configDB') != -1:
config_file.write(line.replace('sharding_configDB', item['sharding_configDB']))
elif line.find('wiredTiger_engineConfig_cacheSizeGB') != -1:
config_file.write(line.replace('wiredTiger_engineConfig_cacheSizeGB',
str(item['wiredTiger_engineConfig_cacheSizeGB'])))
else:
config_file.write(line)
def release_mongod_tasks(json_config):
"""
启动mongod进程
:param json_config: dict,配置信息
:return:
"""
try:
# 启动configsvr
print("******************************开始启动config进程******************************")
for task in json_config["configsvr"]:
log_dir = os.path.dirname(task["systemLog_path"])
pid_dir = os.path.dirname(task["processManagement_pidFilePath"])
db_dir = task["storage_dbPath"]
mkdir_command = 'ssh {0} "mkdir -p {1};mkdir -p {2};mkdir -p {3};chown mongod:mongod {3}"'.format(
task["IP"], log_dir, pid_dir, db_dir, db_dir)
logging("DEBUG", "release_mongod_tasks--创建config server 数据目录--{}".format(mkdir_command))
subprocess.check_call(mkdir_command, shell=True)
up_command = 'ssh {0} "mongod --config {1}"'.format(task["IP"],
os.path.join(PWD, 'configsvr_{0}_{1}.config'.format(
task["IP"], task["role"])))
logging("DEBUG", "release_mongod_tasks--启动config server进程--{}".format(up_command))
subprocess.check_call(up_command, shell=True)
print("******************************config进程启动成功******************************")
# 启动shardsvr
print("******************************开始启动shard进程******************************")
for task in json_config["shardsvr"]:
log_dir = os.path.dirname(task["systemLog_path"])
pid_dir = os.path.dirname(task["processManagement_pidFilePath"])
db_dir = task["storage_dbPath"]
mkdir_command = 'ssh {0} "mkdir -p {1};mkdir -p {2};mkdir -p {3};chown mongod:mongod {3}"'.format(
task["IP"], log_dir, pid_dir,
db_dir, db_dir)
logging("DEBUG", "release_mongod_tasks--创建shard数据目录--{}".format(mkdir_command))
subprocess.check_call(mkdir_command, shell=True)
up_command = 'ssh {0} "systemctl stop mongod; mongod --config {1}"'.format(task["IP"],
os.path.join(PWD,
'shardsvr_{0}_{1}.config'.format(
task["IP"],
task["role"])))
logging("DEBUG", "release_mongod_tasks--启动shard进程--{}".format(up_command))
subprocess.check_call(up_command, shell=True)
print("******************************shard进程启动成功******************************")
return True
except Exception as error:
logging("ERROR", "release_mongod_tasks--进程启动失败--FAILED:{}".format(error))
raise
def release_mongos_tasks(json_config):
"""
启动mongos进程
:param json_config: dict,配置信息
:param PWD:str,配置文件所在的路径
:return:
"""
try:
# 启动mongos
print("******************************开始启动mongos进程******************************")
for task in json_config["mongos"]:
log_dir = os.path.dirname(task["systemLog_path"])
pid_dir = os.path.dirname(task["processManagement_pidFilePath"])
mkdir_command = 'ssh {0} "mkdir -p {1};mkdir -p {2}"'.format(task["IP"], log_dir, pid_dir)
logging("DEBUG", "release_mongos_tasks--创建mongos数据目录--{}".format(mkdir_command))
subprocess.check_call(mkdir_command, shell=True)
up_command = 'ssh {0} "mongos --config {1}"'.format(task["IP"],
os.path.join(PWD, 'mongos_{0}_{1}.config'.format(
task["IP"], task["role"])))
logging("DEBUG", "release_mongos_tasks--启动mongos进程--{}".format(up_command))
subprocess.check_call(up_command, shell=True)
print("******************************mongos进程启动成功******************************")
return True
except Exception as error:
logging("ERROR", "release_mongos_tasks--mongos进程启动失败--FAILED:{}".format(error))
raise
def init_configsrv(json_config):
"""
初始化config server进程
:param json_config: dict,配置信息
:return:
"""
try:
msg = 'rs.initiate({_id:"replconfig",configsvr:true,members:'
count = 0
members = '[ '
for configsvr in json_config["configsvr"]:
members = members + '{_id:' + str(count) + ', host: ' + '"{0}:{1}"'.format(configsvr["IP"],
configsvr["net_port"]) + '},'
count += 1
msg += members[:-1] + ']})'
return msg
except Exception as error:
print(error)
raise
def init_shards(json_config):
"""
初始化shard server进程
:param json_config: dict,配置信息
:return:
"""
try:
msg_list = list()
for shard in json_config["shardsvr"]:
msg = 'rs.initiate({{_id:"{0}",members:[{{_id:0,host:"{1}:{2}"}}]}})'.format(
shard["replication_replSetName"],
shard["IP"], shard["net_port"])
msg_list.append(msg)
return msg_list
except Exception as error:
print(error)
raise
def init_mongos(json_config):
"""
汇总shard信息,组合addshard的信息
:param json_config: dict,配置信息
:return:
"""
try:
msg_list = list()
for shard in json_config["shardsvr"]:
msg = 'sh.addShard("{0}/{1}:{2}")'.format(shard["replication_replSetName"], shard["IP"], shard["net_port"])
msg_list.append(msg)
return msg_list
except Exception as error:
print(error)
raise
def init_mongod(json_config):
"""
初始化mongo进程,包括config server,shard server,mongos server
:param json_config: dict,配置信息
:return:
"""
try:
print("******************************开始初始化config节点******************************")
confidsvr_command = init_configsrv(json_config=json_config)
if confidsvr_command is not None:
host = json_config["configsvr"][0]["IP"]
port = json_config["configsvr"][0]["net_port"]
config_shell = "mongo --host {0} --port {1} --eval '{2};rs.isMaster()'".format(host, port,
confidsvr_command)
logging("DEBUG", "init_mongod--初始化config server--{}".format(config_shell))
subprocess.check_call(config_shell, shell=True)
print("******************************config节点初始化成功******************************")
print("******************************开始初始化shard节点******************************")
shards_command = init_shards(json_config=json_config)
if shards_command is not None:
i = 0
for shard in json_config["shardsvr"]:
host = shard["IP"]
port = shard["net_port"]
shard_shell = "mongo --host {0} --port {1} --eval '{2}'".format(host, port, shards_command[i])
i += 1
logging("DEBUG", "init_mongod--初始化shard--{}".format(shard_shell))
subprocess.check_call(shard_shell, shell=True)
print("******************************shard节点初始化成功******************************")
return True
except Exception as error:
logging("ERROR", "init_mongod--初始化失败--{}".format(error))
raise
def add_shards(json_config):
"""
向mongos进程中添加shard信息
:param json_config: dict,配置信息
:return:
"""
try:
print("******************************开始向mongos中添加shard******************************")
mongos_command = init_mongos(json_config=json_config)
if mongos_command is not None:
host = json_config["mongos"][0]["IP"]
port = json_config["mongos"][0]["net_port"]
for mongos in mongos_command:
shard_shell = "mongo --host {0} --port {1} --eval '{2}'".format(host, port, mongos)
logging("DEBUG", "add_shards--mongos增加shard--{}".format(shard_shell))
subprocess.check_call(shard_shell, shell=True)
print("******************************向mongos中添加shard成功,共添加{0}个shard******************************".format(
len(mongos_command)))
return True
except Exception as error:
logging("ERROR", "add_shards--mongos增加shard失败--{}".format(error))
raise
def read_config(func):
def wrapper(*args, **kwargs):
config = args[0].file_path
with open(config, 'r') as jsonfile:
json_config = json.load(jsonfile)
func(json_config=json_config)
return wrapper
@read_config
def create(json_config):
print("******************************开始生成配置文件******************************")
gen_config(json_config=json_config, prefix='configsvr')
gen_config(json_config=json_config, prefix='shardsvr')
gen_config(json_config=json_config, prefix='mongos')
print("******************************配置文件生成成功******************************")
@read_config
def stop(json_config):
print("******************************开始关闭服务******************************")
ips = set()
for key in json_config:
for item in json_config[key]:
ips.add(item["IP"])
for ip in ips:
stop_mongod_cmd = "ssh {0} ".format(
ip) + " \"ps -ef | grep 'mongod --config' | grep -v grep | awk -F ' ' '{print \$2}' | xargs kill -9 ||:\""
stop_mongos_cmd = "ssh {0} ".format(
ip) + " \"ps -ef | grep 'mongos --config' | grep -v grep | awk -F ' ' '{print \$2}' | xargs kill -9 ||:\""
print(stop_mongod_cmd)
print(stop_mongos_cmd)
subprocess.check_call(stop_mongod_cmd, shell=True, stderr=False)
subprocess.check_call(stop_mongos_cmd, shell=True, stderr=False)
print("******************************服务关闭成功******************************")
@read_config
def start(json_config):
# 启动config和shard进程
print("******************************开始启动和初始化服务******************************")
if release_mongod_tasks(json_config=json_config) and init_mongod(json_config=json_config):
# 启动mongos进程
if release_mongos_tasks(json_config=json_config) and add_shards(json_config=json_config):
print("******************************SUCCESS******************************")
if __name__ == '__main__':
try:
parser()
except Exception as error:
logging("ERROR", "mongodb 构建失败,进行回滚: {0}".format(error))
print("******************************FAILED******************************")