fightclub

dont talk about it


  • 首页

  • 分类

  • 归档

  • 标签
fightclub

一个模块化的命令行解析器实现方案

发表于 2019-11-27 | 分类于 程序设计

问题背景

大约在上个月,有这样一个需求,需要为HPC的用户提供一个统一的任务提交脚本。用户可以使用这个脚本提交任务计算。我们实际使用的任务调度系统是PBSpro。因此这个主要的功能是:

  • 为各种不同的计算软件提供一个一致的入口,对复杂的pbs提交参数进行检测
  • 在任务提交前对用户参数进行校验和检查
  • 在任务提交前根据当前队列的状态,优化提交参数

这是一个简单的需求。我个人比较喜欢argparse这个库。能够很容易实现各种模式的命令行参数。当然也有一些同学喜欢click, Fire或者是optparse,今天就不多做讨论了。

最初的版本大概是这样子的

pbs_sub.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import os
from glob import glob
from argparse import ArgumentParser
from copy import deepcopy
import logging
import json
import random
import pwd
import sh
__version__ = '1.1.0'
PBS_SERVER = 'p-shhq-hpc-pbs-m01'
QSUB = sh.Command('/opt/pbs/bin/qsub')
PBSNODES = sh.Command('/opt/pbs/bin/pbsnodes')
powerflow_run = '/home/hpcsw/PBSsubmit/run.powerflow2'
dis_run = '/home/hpcsw/PBSsubmit/run.dis2'
logger = logging.getLogger('pbs_sub')
# 一些公共函数
def free_cores(queue):
"""
free cores in queue
"""
count = 0
logger.debug(f'getting free cores in {queue}')
try:
raw_nodedata = json.loads(PBSNODES('-aS', '-F', 'json').stdout)['nodes']
raw_nodedata2 = json.loads(PBSNODES('-Saj', '-F', 'json').stdout)['nodes']
except:
logger.error('PBS error, can not get pbs info')
exit(-1)
for k, v in raw_nodedata.items():
if v.get('queue') != queue:
continue
else:
core_free = int(raw_nodedata2[k].get('ncpus f/t').split('/')[0])
count += core_free
logger.debug(f'number of free cores in {queue} is {count}')
return count
# 公共命令行参数
parser = ArgumentParser(description=__doc__,
epilog='if you have any question or suggestion, please contact with thuhak.zhou@nio.com')
parser.add_argument('-n', '--name', help='job name')
parser.add_argument('-l', '--log_level', choices=['error', 'info', 'debug'], default='info', help='logging level')
parser.add_argument('-W', '--wait', metavar='JOB_ID', help='depend on which job')
software = parser.add_subparsers(dest='software', help='supported software')
# powerflow 软件的命令行参数
powerflow = software.add_parser('powerflow', help='powerflow')
powerflow.add_argument('-q', '--queue', choices=["cfd", "cfd2", "cfdbs"], required=True, help='queue of job')
powerflow.add_argument('-o', '--outdir', help='output directory, default is as same as jobfile')
powerflow.add_argument('-c', '--core', type=int, choices=[32, 64, 128, 256, 512, 768, 1024],
help='how many cpu cores you wanna use. default value is half of the free cores in your queue')
powerflow.add_argument('jobfile', help='job file')
powerflow.add_argument('-v', '--version', choices=["4.4d", "5.3b", "5.3c", "5.4b", "5.5a", "5.5c", "5.5c2", "6-2019"],
default="5.5b", help='version of powerflow software')
powerflow.add_argument('--nts', metavar='TIMESTEPS', type=int, help='num timesteps')
pow_group = powerflow.add_mutually_exclusive_group()
powerflow.add_argument('--seed_bondaries', action='store_true', help='set seed_bondaries')
pow_group.add_argument('-r', '--resume', metavar='RESUME_FILE', help='resume file')
pow_group.add_argument('-s', '--seed', metavar='SEED_FILE', help='seed file')
powerflow.add_argument('--mme', action='store_true', help='mme checkpoint at end')
powerflow.add_argument('--full', action='store_true', help='full checkpoint at end')
powerflow.add_argument('--pt', action='store_true', help='ptherm nprocs and ptherm max unmatched ratio')
powerflow.add_argument('--dis', choices=['only', 'full'],
help='run discretize, when you set only, just run dis job, if your set full, powerflow job will be run after dis job is finish')
powerflow.add_argument('--vr', metavar='LEVEL', type=int, help='suppress vr level for discretize job')
# 其他软件参数位置
other = software.add_parser('other')
...
if __name__ == '__main__':
# 公共的
uid = os.getuid()
user = pwd.getpwuid(uid)[0]
email = user + '@nio.com'
args = parser.parse_args()
soft = args.software
waitfor = args.wait
log_level = getattr(logging, args.log_level.upper())
logging.basicConfig(level=log_level, format='%(asctime)s [%(levelname)s]: %(message)s',
datefmt="%Y-%m-%d %H:%M:%S")
for handler in logging.root.handlers:
handler.addFilter(logging.Filter('pbs_sub'))
base_args = ['-m', 'abe', '-M', email]
if waitfor:
if '.' not in waitfor:
waitfor = waitfor + '.' + PBS_SERVER
var_w = f'depend=afterok:{waitfor}'
base_args.extend(['-W', var_w])
# powerflow 处理参数的处理过程
if soft == 'powerflow':
...
# 其他软件处理过程
elif soft == 'other':
...

整个脚本的大致结构为:

  1. 库和全部变量
  2. 一些公共的辅助函数,以及某些软件所使用的函数
  3. 公共的命令行解析器
  4. 所有软件的命令行解析器作为公共命令行解析器的子解析器
  5. 公共的命令行处理过程
  6. 用if-else放置不同软件的处理函数

这个版本执行起来是没有问题的。但是依然会有几个问题:

  • 如果需要支持的软件很多,那么整个代码会变得非常长
  • 这个脚本需要经常改动。某个软件改动如果搞错可能会影响到不相干的软件执行

为了解决这个问题,有两个实现方案:

  • 实现一个公共库,把公共解析器以及公共函数放在里面,每个软件用一个单独的入口脚本,通过引用公共库来实现代码复用
  • 实现一个公共库,把公共解析器以及公共函数放在里面。再将各个软件当作插件装载进来。这个方案需要解决的问题打乱插件的代码顺序,先执行所有parser的注册,最后根据用户输入参数的解析,将请求路由到恰当的插件里

今天要讲的,就是第二种方案的一种实现

__init_subclass__魔术方法介绍

在这个方案中,使用了__initsubclass__这个实现子类的注册和回调。因此我们首先需要了解一下\_init_subclass__的作用和使用方法。

__init_subclass__是python3.6引入的一个新的魔术方法。使用这个魔术方法,可以在子类定义的过程中作为钩子被调用,作用机制和元类相似。最大的区别就是这个魔术方法只在子类构建的时候被调用,父类是不会调用的。实际用起来更为简洁,编程体验也更好。元类说到底,还是有些麻烦的。而且更为重要的是,这个魔术方法可以和元类一起使用,不会产生冲突。经常使用元类编程的同学遇到的问题可能就是无法同时使用两种元类,有了这个魔术方法,代码设计上又增加了很大的灵活性。

现在我们举个具体的例子来说明__init_subclass__是如何使用的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class MetaData(type):
def __init__(cls, clsname, bases, clsdict):
super().__init__(clsname, bases, clsdict) # 可以操作cls
print('meta class')
class A(metaclass=MetaData):
data = 'A'
def __new__(cls, *args, **kwargs):
print('running new')
return super().__new__(cls, *args, **kwargs) #返回实例对象
def __init_subclass__(cls, *args, **kwargs):
print('running init_subclass') #可以操作cls
def __init__(self, *args, **kwargs):
print('running init')

我们先定义一个元类,以及一个使用该元类的类A,输出结果为

1
meta class

从输出的结果可以看到,在定义类A的时候,MetaData被触发

1
2
class B(A):
...

我们用A作为父类创建一个子类B,输出结果为:

1
2
running init_subclass
meta class

从输出的结果来看,调用的顺序是先调用__init_subclass__,再调用元类。

我们再创建一个实例看看:

1
b = B()

输出结果为:

1
2
running new
running init

这个表现符合预期,没什么多说的。我们再用B为父类创建一个孙子类来看看__init_subclass__是否会执行

1
2
class C(B):
...

输出结果为

1
2
running init_subclass
meta class

可以看到,执行结果和B一样,说明__init_subclass__是可以传递的。我们再来试试方法重写

1
2
3
4
class D(A):
def __init_subclass__(cls, *args, **kwargs):
print('rewrite')
super().__init_subclass__(cls, *args, **kwargs)
1
2
running init_subclass
meta class
1
2
class F(D):
...

执行结果为:

1
2
3
rewrite
running init_subclass
meta class

可以看到,__init_subclass__可以像常规的的方法一样实现重写。我们再来试试多重继承:

1
2
3
4
class E:
def __init_subclass__(cls, *args, **kwargs):
print('other init subclass')
super().__init_subclass__(cls, *args, **kwargs)
1
2
class G(A, E):
...

执行结果为:

1
2
running init_subclass
meta class

E中的__init_subclass__并没有执行,但是并没有有冲突错误。

1
2
class H(E, A):
...

执行结果为

1
2
3
other init subclass
running init_subclass
meta class

因为使用了supper(),A中的__init_subclass__得到了执行。看起来,多重继承的表现和普通的方法没什么两样。

经过一系列实验,最后得到的结论是在同时使用metaclass、__initsubclass__, \_new与__init的时候。在类的构建期依次调用__initsubclass__,与metaclass,而在实例构建的时候,依次调用\_new与__init。

而__init_subclass__的特性,和普通的方法区别不大。如果想有更多的了解,可以进一步参考PEP 487

利用__init_subclass__实现注册和回调机制

说完了__init_subclass__的用法以后,我们再具体看一下在这个实际案例中,如何利用这个魔术方法的特性,实现代码的解藕。

先上代码:

pbs_sub.py

1
2
3
4
5
6
#!/usr/bin/env python3.6
from software import MainParser
if __name__ == '__main__':
parser = MainParser()
parser.run()

software/__init__.py

1
2
from .mainparser import MainParser
from .powerflow import PowerFlow

software/mainparser.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
"""
main parser
"""
# author: thuhak.zhou@nio.com
from argparse import ArgumentParser
from weakref import WeakValueDictionary
import logging
from logging.handlers import RotatingFileHandler
import json
import os
from os.path import dirname, join, abspath, isfile, isdir
from string import ascii_letters, digits
import pwd
import time
import sh
__version__ = '2.1.0'
__author__ = 'thuhak.zhou@nio.com'
class classproperty:
def __init__(self, func):
self.func = func
def __get__(self, instance, cls):
if cls is None:
return self
else:
value = self.func(cls)
setattr(cls, self.func.__name__, value)
return value
class MainParser:
"""
provide common args for pbs.
argument parser of subclasses will regist in _all_software variable
the request will be route to the right software subclass
"""
_all_software = WeakValueDictionary()
logger = logging.getLogger('pbs_sub')
PBS_SERVER = 'p-shhq-hpc-pbs-m01'
QSUB = sh.Command('/opt/pbs/bin/qsub')
script_base = join(dirname(abspath(__file__)), 'run_scripts')
def __init__(self):
self.base_args = []
self.parser = ArgumentParser(description=f"This script is used for pbs job submission, version: {__version__}",
epilog=f'if you have any question or suggestion, please contact with {__author__}')
self.parser.add_argument('-n', '--name', help='job name')
self.parser.add_argument('-l', '--log_level', choices=['error', 'info', 'debug'], default='info',
help='logging level')
self.parser.add_argument('-W', '--wait', metavar='JOB_ID', help='depend on which job')
self.parser.add_argument('--free_cores', action='store_true', help='show free cpu cores by queue')
software = self.parser.add_subparsers(dest='software', help='software list')
for soft in self._all_software:
cls = self._all_software[soft]
parser = software.add_parser(cls.__software__, help=f'(script version {cls.__version__})')
cls.add_parser(parser)
@classmethod
def add_parser(cls, parser: ArgumentParser):
"""
abc interface, implement this method in subclass
"""
raise NotImplementedError
@classmethod
def handle(cls, args, base_args) -> list:
"""
abc interface, implement this method in subclass
args: argument args
base_args: all argument provided by main parser
:return all job ids in list
"""
raise NotImplementedError
def __init_subclass__(cls, **kwargs):
"""
regist subclass
"""
super().__init_subclass__(**kwargs)
if not getattr(cls, '__software__') or not getattr(cls, '__version__'):
raise NotImplementedError('you need to set __software__ and __version__ attribute in your subclass')
cls._all_software[cls.__software__] = cls
@classproperty
def default_run(cls):
"""
default run script
"""
run_script = join(cls.script_base, cls.__software__ + '.sh')
if not isfile(run_script):
cls.logger.error(f'you need to put the script {run_script} first')
exit(1)
return run_script
@classproperty
def pbs_nodes_data(cls):
"""
get node data from pbs
"""
cls.logger.debug('getting pbs node info')
try:
PBSNODES = sh.Command('/opt/pbs/bin/pbsnodes')
raw_nodedata = json.loads(PBSNODES('-aS', '-F', 'json').stdout)['nodes']
raw_nodedata2 = json.loads(PBSNODES('-Saj', '-F', 'json').stdout)['nodes']
for k in raw_nodedata.keys():
raw_nodedata[k].update(raw_nodedata2[k])
return raw_nodedata
except Exception as e:
cls.logger.error(f'PBS error, can not get pbs node info, reason: {str(e)}')
exit(-1)
@classproperty
def pbs_job_data(cls):
"""
get pbs job data
"""
cls.logger.debug('getting pbs job info')
try:
QSTAT = sh.Command('/opt/pbs/bin/qstat')
raw_data = sh.grep(QSTAT('-f', '-F', 'json'), '-v', 'Submit_arguments').stdout
job_data = json.loads(raw_data)['Jobs']
return job_data
except Exception as e:
cls.logger.error(f'PBS error, can not get pbs job info, reason: {str(e)}')
exit(-1)
@classmethod
def free_cores(cls, queue: str) -> int:
"""
get free cores in queue
"""
count = 0
cls.logger.debug(f'getting free cores in {queue}')
pbsdata = cls.pbs_nodes_data
for node in pbsdata.values():
if node.get('queue') == queue:
core_free = int(node.get('ncpus f/t').split('/')[0])
count += core_free
cls.logger.debug(f'number of free cores in {queue} is {count}')
return count
@classmethod
def all_free_cores(cls) -> dict:
"""
get all free cores in all queues
"""
from collections import defaultdict
cls.logger.debug('getting free cores')
pbsdata = cls.pbs_nodes_data
ret = defaultdict(int)
for node in pbsdata.values():
queue = node.get('queue')
if queue:
ret[queue] += int(node.get('ncpus f/t').split('/')[0])
return ret
@classmethod
def check_jobid(cls, jid: str) -> bool:
"""
check job id in pbs or not
"""
cls.logger.debug(f'checking job {jid}')
return jid in cls.pbs_job_data
@classmethod
def get_jid_info(cls, jid: str) -> dict:
cls.logger.debug(f'getting job information for {jid}')
try:
QSTAT = sh.Command('/opt/pbs/bin/qstat')
raw_info = sh.grep(QSTAT('-f', '-F', 'json', jid), '-v', 'Submit_arguments').stdout
job_info = json.loads(raw_info)['Jobs'][jid]
return job_info
except json.decoder.JSONDecodeError:
cls.logger.error(f'job {jid} is not json format')
cls.logger.debug(raw_info)
exit(1)
except Exception as e:
cls.logger.error(f'unable to get information from pbs, reason: {str(e)}')
exit(1)
@staticmethod
def replace_id(args: list, jobid: str) -> None:
"""this function will change the state of args"""
try:
w_i = args.index('-W')
args[w_i + 1] = f'depend=afterok:{jobid}'
except ValueError:
args.extend(['-W', f'depend=afterok:{jobid}'])
@staticmethod
def fix_jobname(name: str) -> str:
valid_str = ascii_letters + digits + '_'
ret = name
for c in name:
if c not in valid_str:
ret = ret.replace(c, '_')
return ret
@staticmethod
def get_ncpu(queue: str) -> int:
return 16 if queue == 'cfdbs' else 32
def run(self):
"""
program entry
"""
uid = os.getuid()
user = pwd.getpwuid(uid)[0]
args = self.parser.parse_args()
log_level = getattr(logging, args.log_level.upper())
handlers = [logging.StreamHandler()]
logdir = '/var/log/pbs_sub'
if isdir(logdir):
loghandler = RotatingFileHandler(join(logdir, user + '.log'), maxBytes=10*1024*1024, backupCount=3, encoding='utf-8')
handlers.append(loghandler)
logging.basicConfig(level=log_level, format='%(asctime)s [%(levelname)s]: %(message)s',
datefmt="%Y-%m-%d %H:%M:%S", handlers=handlers)
for handler in logging.root.handlers:
handler.addFilter(logging.Filter('pbs_sub'))
if args.free_cores:
free_cores = self.all_free_cores()
ali = max([len(x) for x in free_cores.keys()]) + 3
for q, c in free_cores.items():
space = ' ' * (ali - len(q))
print(f'{q}:{space}{c}')
return
software = args.software
email = user + '@nio.com'
waitfor = args.wait
base_args = ['-m', 'abe', '-M', email]
if waitfor:
if '.' not in waitfor:
waitfor = waitfor + '.' + self.PBS_SERVER
if not self.check_jobid(waitfor):
self.logger.debug(f'invalid job id {waitfor}')
exit(1)
var_w = f'depend=afterok:{waitfor}'
base_args.extend(['-W', var_w])
if software in self._all_software:
jids = self._all_software[software].handle(args, base_args)
time.sleep(1)
for jid in jids:
job_info = self.get_jid_info(jid)
job_stat = job_info.get('job_state')
info = f'job {jid} is in state {job_stat}'
comment = job_info.get('comment')
if comment:
info += f',comment: {comment}'
self.logger.info(info)
else:
self.parser.print_help()

software/powerflow.py

因为涉及到具体的业务逻辑,因此整个实现就被我省略了,这里只是为了演示整个结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# author: thuhak.zhou@nio.com
import os
import random
from copy import deepcopy
from glob import glob
import re
import sh
from .mainparser import MainParser
class PowerFlow(MainParser):
__software__ = 'powerflow'
__version__ = '2.1.0'
base_dir = '/home/hpcsw/EXA/powerflow'
@classmethod
def add_parser(cls, parser):
parser.add_argument('-q', '--queue', choices=["cfd", "cfd2", "cfdbs"], default="cfdbs", help='queue of job')
parser.add_argument('-c', '--core', type=int, choices=[32, 64, 128, 256, 512, 768, 1024],
help='how many cpu cores you wanna use. default value is half of the free cores in your queue')
parser.add_argument('jobfile', help='job file')
parser.add_argument('-v', '--version',
choices=["4.4d", "5.3b", "5.3c", "5.4b", "5.5a", "5.5b", "5.5c", "5.5c2", "6-2019"],
default="5.5b", help='version of powerflow')
parser.add_argument('--nts', metavar='TIMESTEPS', type=int, help='num timesteps')
parser.add_argument('--seed_bondaries', action='store_true', help='set seed_bondaries')
pow_group = parser.add_mutually_exclusive_group()
pow_group.add_argument('-r', '--resume', metavar='RESUME_FILE', help='resume file')
pow_group.add_argument('-s', '--seed', metavar='SEED_FILE', help='seed file')
parser.add_argument('--mme', action='store_true', help='mme checkpoint at end')
parser.add_argument('--full', action='store_true', help='full checkpoint at end')
parser.add_argument('--pt', action='store_true', help='enable powerTHERM')
parser.add_argument('--dis', choices=['only', 'full'],
help='run discretize, when you set only, just run dis job, if your set full, powerflow job will be run after dis job is finish')
parser.add_argument('--vr', metavar='LEVEL', type=int, help='suppress vr level for discretize job')
parser.add_argument('--postacoustics', choices=['only', 'full'],
help='run powerflow post process, contact with fuchao.wang@nio.com to get more information')
@classmethod
def default_cores(cls, queue):
total_cores = cls.free_cores(queue)
for c in (2048, 1536, 1024, 960, 512):
if total_cores >= c:
return c // 2
return 128
@classmethod
def handle(cls, args, base_args):
jids = []
...
return jids

从整个代码结构上, 我们可以看到。

公共的逻辑被放置到了MainParser中,而powerflow的业务逻辑,则被独立开来,放置到了一个单独的文件内。并通过类的继承,复用了公用的类的方法,变量,以及数据缓存。

整个软件的逻辑如下:

  1. 在MainParser类中,通过__init_subclass__,在类还在创建过程中,就将整个类对象放置到自己的全局字典中。而类创建代码,则是在import的时候执行
  2. 在MainPasrer进行实例初始化的时候,依次调用所有子类的add_parser方法,实现所有插件parser的注册
  3. 最后在实例化的mainparser对象中,通过run方法,解析用户的输入,并路由给对应的插件handle处理方法。拿到对应handle的处理后,在进行下一道工序的处理

所有的MainParser的类中,所有的子类均没有进行实例化,因为并没有必要。唯一实例化的是MainParser本身。

而当你需要添加一个新的计算软件。那么你只需要:

在software中放置一个单独的文件,写一个MainParser的子类.

在这个子类中,需要实现:

  • 定义__software,以及__version类变量。注册插件信息
  • 实现add_parser类方法,提供子命令的入口给mainparser回调
  • 实现handle类方法,提供给mainparser回调
  • 最后通过修改sotware/__init__.py将新的代码引入即可

如果觉得每次都要import一下新的代码很烦,可以定制import的过程做自动加载,但是目前来看,并没有多少必要来做这个事,还是从简的好。

总结

通过使用python元编程,我们往往可以使用极少的代码就可以实现比较复杂的设计模式。这也python是让人非常上瘾的一点。

fightclub

saltstack中的设计模式(六)

发表于 2019-02-12 | 分类于 saltstack

在今天我们介绍使用pillar进行参数化sls的方法,以及pillar数据结构设计的一般模式。
案例如下:

为通过saltstack部署的mariadb提供个性化配置

在salt中,主要使用pillar来存储动态信息。在sls中,在第一次渲染中提取出动态信息的值,编译成sls的执行指令进行第二道程序执行。

/srv/salt/services/mariadb/init.sls

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
{% from "macros/setrole.sls" import setrole %}
{{ setrole("mariadb") }}
include:
- system.pkgrepos.mariadb
- .zabbix
mariadb:
pkg.installed:
- allow_updates: True
- pkgs:
- MariaDB-server
- MariaDB-client
- MariaDB-shared
- MariaDB-connect-engine
- MariaDB-backup
# - mariadb-Galera-server
# - galera
- require:
- pkgrepo: mariadb-repo
mariadb-datadir:
file.directory:
- name: /data/mariadb/data
- user: mysql
- group: mysql
- dir_mode: 750
- makedirs: True
- require:
- pkg: mariadb
mariadb-tmpdir:
file.directory:
- name: /data/mariadb/tmp
- user: mysql
- group: mysql
- dir_mode: 750
- recurse:
- user
- group
- mode
- makedirs: True
- require:
- pkg: mariadb
mariadb-innodbdir:
file.directory:
- name: /data/mariadb/innodb
- user: mysql
- group: mysql
- dir_mode: 750
- makedirs: True
- require:
- pkg: mariadb
mariadb-file:
file.managed:
- name: /etc/my.cnf.d/server.cnf
- source: salt://{{ slspath }}/templates/server.cnf
- template: jinja
- backup: minion
- require:
- pkg: mariadb
mariadb-install-db:
cmd.run:
- name: "mysql_install_db --user=mysql --defaults-file=/etc/my.cnf.d/server.cnf"
- require:
- file: /etc/my.cnf.d/server.cnf
- unless: "file -f /data/mariadb/innodb/ibdata1"
mariadb-service:
service.running:
- name: mariadb
- enable: True
- restart: True
- require:
- cmd: mariadb-install-db
- watch:
- pkg: mariadb
- file: /etc/my.cnf.d/server.cnf
mariadb-initialization:
cmd.run:
- name: "/usr/bin/mysql_upgrade;echo 'delete from mysql.user where host not in (\"localhost\", \"127.0.0.1\") or user != \"root\";drop database test;flush privileges;' | mysql -u root"
- require:
- service: mariadb-service
- unless: "test -f /var/lib/mysql/initialization.lock"
file.managed:
- name: /var/lib/mysql/initialization.lock
- user: mysql
- contents:
- "This is the Initialization lock file for MariaDB."
- onchanges:
- cmd: mariadb-initialization
/etc/logrotate.d/mysql:
file.managed:
- source: salt://{{ slspath }}/templates/log_rotate

我们把所有mariadb有关的的任务,都放在services.mariadb下面。 如果有其他额外的复杂功能,比如监控相关的,就在include下面直接链接即可。
/etc/my.cnf.d/server.cnf这个文件引用了一个jinja的模版,我们用这个jinja模版实现了动态的配置参数,如下

/srv/salt/services/mariadb/templates/server.cnf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#Initialized by Salt
{%- set conf = salt['pillar.get']('mariadb', {}) -%}
{%- macro kvset(k, v) -%}
{{ k }}={{ conf.get(k, v) }}
{%- endmacro %}
[server]
[galera]
[embedded]
[mariadb]
[mariadb-10.1]
[mysqld]
bind-address=0.0.0.0
{{ kvset("general_log", 1) }}
{{ kvset("general_log_file", "/data/mariadb/general.log") }}
slow_query_log=1
slow_query_log_file=/data/mariadb/slow_query_log.log
slow_launch_time=2
event_scheduler=1
default_storage_engine=InnoDB
character-set-client-handshake = FALSE
character-set-server = utf8mb4
collation-server = utf8mb4_unicode_ci
#read-only=1
{{ kvset('server-id', 1) }}
sync_binlog=1
skip_name_resolve=1
log-bin=mysql-bin
binlog_format = mixed
expire_logs_days = 3
max_binlog_size = 100M
relay-log=relay-log
log-slave-updates=true
max_relay_log_size = 100M
relay_log_purge=0
relay_log_recovery=1
master_verify_checksum = 1
slave_sql_verify_checksum = 1
port = 3306
socket = /var/lib/mysql/mysql.sock
skip-external-locking
key_buffer_size = {{ (salt['grains.get']('mem_total') * 0.05) | int }}M
max_allowed_packet = 64M
table_open_cache = 16384
sort_buffer_size = 2M
read_buffer_size = 2M
read_rnd_buffer_size = 2M
myisam_sort_buffer_size = 64M
thread_cache_size = 32
query_cache_size = 0M
datadir = /data/mariadb/data
tmpdir = /data/mariadb/tmp
innodb_file_per_table=1
innodb_data_home_dir = /data/mariadb/innodb
innodb_data_file_path = ibdata1:256M;ibdata2:256M:autoextend
innodb_autoextend_increment = 64
innodb_log_group_home_dir = /data/mariadb/innodb
innodb_buffer_pool_size = {{ (salt['grains.get']('mem_total') * 0.6) | int }}M
innodb_log_file_size = 256M
innodb_log_buffer_size = 4M
innodb_flush_log_at_trx_commit = 1
innodb_lock_wait_timeout = 50
lower_case_table_names = 1
max_connections = 2048
max_connect_errors = 64
log_bin_trust_function_creators = 1
{{ kvset("interactive_timeout", 720) }}
{{ kvset("wait_timeout", 720) }}
[mysqldump]
quick
max_allowed_packet = 256M
[mysql]
no-auto-rehash
[myisamchk]
key_buffer_size = 256M
sort_buffer_size = 256M
read_buffer = 32M
write_buffer = 32M
[mysqlhotcopy]
interactive-timeout

我们在模版中,我们先从pillar中取出mariadb的嵌套数据,并根据需要的值在其中获取数据。对于每个pillar中的数据,我们都赋予一个默认值,这样在没有额外设置pillar的情况,就和静态版本的一致。

注意到innodb_buffer_pool_size这个键的值,并不是在pillar中决定的,而是取自grains。对于根据系统情况决定的参数,都可以通过grain传进来。

在这个模版做好以后,如果有变化部署的需求。我们可以将mariadb的参数的pillar赋予给对应的主机,来实现主机指定参数。

以下几个实现方案都是不错的。

  1. 用户通过执行salt-call state.sls services.mariadb pillar=”{‘k’: ‘v’}”的方式调用。这适用于一次性的个性化部署,无需salt管理员介入
  2. 假设我们为一个项目或主机部署,那么我们在这台主机的pillar中设置mariadb的子参数
  3. 我们可以使用pillar extension,比如基于rdbms或是etcd等的。可以单独设计UI,并和salt系统集成起来,实现paas平台

无论怎样,我们都需要预留pillar中mariadb的key的位置,保留给该任务使用。假设我们常用的有几套参数,那么我们可以在pillar中按照名称放置几个参数组,例如创建目录/srv/pillar/mariadb/,在这个下面放置parm_group1,parm_group2等参数组,再把特定的参数组同主机关联起来即可实现参数组的享元。

总结

任务设计的模式,主要取决于你想如何使用这个自动化任务。
任务设计的复杂程度,取决于应用场景的复杂成都。

在今天这个场景中,该部署任务主要适用于执行半自动部署,也就是手工调用salt xx state.sls services.mariadb或者salt-call state.sls services.mariadb或者在其他自动化任务中通过include该任务执行部署的场景。

fightclub

saltstack中的设计模式(五)

发表于 2019-02-10 | 分类于 saltstack

上一篇文章中,我们介绍了通过import jinja的方式进行代码的复用,这个场景并不是很常见。更为常见的场景是使用include来进行引用

接下来是今天的案例

为每个通过salt部署的服务提供定制化的监控

我们通过salt部署了很多的服务,这些服务除了本身的配置以外,我们也希望为服务定制一些自定义zabbix监控脚本。
我们通过services.zabbix进行zabbix的部署。每当我们部署新的服务,对应的客户端脚本以及配置也会更新,更新好以后重启zabbix

假设我们需要部署的服务为mariadb,sls的入口在/srv/salt/services/mariadb/init.sls,
我们可以将监控部分独立成一个配置文件,放在/srv/salt/services/mariadb/zabbix.sls中

实现

/srv/salt/services/mariadb/init.sls

1
2
3
4
5
include:
.zabbix
otherid:
some.somefunc: []

/srv/salt/services/mariadb/zabbix.sls

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
include:
- services.zabbix
/opt/scripts/mariadb_status.sh:
file.managed:
- source: salt://{{ slspath }}/scripts/mariadb_status.sh
- mode: 755
mariadb_zabbix_conf:
file.managed:
- name: /etc/zabbix/zabbix_agentd.d/userparameter_mariadb.conf
- source: salt://{{ slspath }}/templates/userparameter_mariadb.conf
- mode: 600
- user: zabbix
- group: zabbix
- require:
- file: /opt/scripts/mariadb_status.sh
- listen_in:
- service: zabbix_agentd

我们在mariadb的任务中,引用了一个zabbix子任务来实现zabbix的扩展, 在子扩展中,引用了zabbix的原始任务。
通过include引入的任务,会在编译的时候重新排序,可以避免因为ID重复导致的错误。

总结

这个案例内,主要运用到以下几个技巧:

  • 在使用include的时候,可以使用一个.带指当前目录
  • 如果换成salt://协议,可以使用带指当前目录
  • state的执行顺序可以通过REQUISITE参数进行重新编排。在配置文件切分很多份的时候,可以使用反向REQUISITE,也就是带后缀_in的参数实现解耦。

state的用法可以参考saltstack官方文档

fightclub

saltstack中的设计模式(四)

发表于 2019-02-09 | 分类于 saltstack

在上一篇文章中,我们介绍了一个技巧,在syndic环境中,让syndic和master都能进行事件处理。在此需要声明的事,salt是一个发展中的软件,通常每年都会有一到两次大的版本更新,会带来若干变更。很多geek技巧会因为版本的更新而被替换为更为简洁的方法。本篇文章是基于2018.3,很多已经同我最早从使用的2015版本的有了极大的差异。因此,在使用新的发布版前,务必阅读release note,了解有哪些关键的特性改变。

在本篇文章,我将介绍一种使用import jinja进行sls复用的方法。接下来我们进入今天的案例

为通过salt部署的服务打上标签

我们会使用salt快速部署一些服务,比如mysql, redis, mongodb, rabbitmq等等。
我们希望每次在部署以后,给这个minion添加一个对应的标签,便于搜索时候使用。

salt有pillar和grains两种方式可以实现这个功能

如果使用pillar,我们需要引入一个动态的pillar存储。我们还要再做一些用来进行写入的state模块,比较复杂。
使用grains会更简单的一些,这样,我们可以通过salt -G ‘roles:tag’按照标签进行搜索。
我们可以使用salt内置的grains.append模块对roles这个列表进行标签添加,但是这个函数不具备stateful的能力,每次执行都会把内容重新append一次。

针对这个问题,我们可以有以下几个方案:

  • 修改grains内置模块,并将代码提交给社区
  • 另外写一个state模块,比如mygrains, 实现自己想要的功能
  • 自己在sls中,通过jinja进行简单的逻辑处理

前两种方案都是推荐使用的,但是我们今天要介绍的是第三种方案,这种处理方式会更为轻量级,也更容易一些。

实现方案

/srv/salt/macros/setrole.sls

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{% macro setrole() -%}
{% set roles = salt["grains.get"]("roles", []) %}
{% for role in varargs %}
{% if role not in roles %}
{{ role }}_roles:
{% if roles == [] and loop.index == 1 %}
grains.present:
- force: True
- value:
- {{ role }}
- name: roles
{% else %}
grains.append:
- value:
- {{ role }}
- name: roles
{% endif %}
{% endif %}
{% endfor %}
{%- endmacro %}

我们单独写一个jinja macro, 叫做setrole,并放在某个路径下。
我们再通过jinja import的方式,应用这个写好的宏。

比如我们写了一个dnsmasq部署的state /srv/salt/services/dnsmasq/init.sls,
我们可以在适当的地方加入两行jinja代码即可

1
2
{% from "macros/setrole.sls" import setrole %}
{{ setrole("dns", "dnsmasq") }}

这样,dns和dnsmasq都会被附加在grains 的roles里面。
唯一需要注意的是jinja import的路径和salt fileserver保持一致即可。

总结

  • 默认的sls的所使用的渲染器是jinja|yaml,jinja可以使用import进行引用,而yaml可以通过include进行引用。大部分时候,我们都可以使用include就能解决sls引用的问题, 但是直接import jinja的实现会更简单一些
  • 对于每个sls的执行逻辑是先执行jinja渲染,再执行yaml的方式,类似c语言中的编译预处理,而非根据实际代码出现的先后顺序执行。一定要注意这个逻辑上的区别
fightclub

saltstack中的设计模式(三)

发表于 2019-02-08 | 分类于 saltstack

在上一篇中,我们还留下了一个问题。就是在响应器中,我们使用了一个_syndic_handle的特殊参数。
这个参数,引出了今天我们要讨论的话题

如何在syndic上进行事件处理

在于大规模的saltstack的部署环境,往往使用master,syndic的架构。
因为设计的原因,所有syndi接收到c的事件都会由底向上传播至上层master。
导致的结果是在事件处理中,同一个事件会被重复处理多次。
为了防止这个问题,我们可以采取以下几种方法

  • 禁用syndic的事件处理配置,由master统一处理
  • 禁用master的事件处理配置,由syndic统一进程处理
  • 在事件发送的时候添加一个标记, 并在处理逻辑中解析这个标记,并做应用层的路由

第一种方案最为简单,一旦master和syndic之间出现通讯故障,会导致syndic上的事件处理失效

而第二种方案的,在有些情况下,比如有很多minion直连master的场景下,就没办法使用

综上,还是最后一种方案适用性更广一点。

思路就是这样,根据实际的架构情况可以采取不同的做法。对于我现在的场景,实现方式如下:

  1. 从syndic下面触发的事件需要在发送事件的时候添加标记,例如_syndic_handle。对于整个配置文件是从master同步过去的syndic来讲,需要客户端动态的获得信息,知道自己是否是连接到syndic上面。salt-minion可以解析自己的配置文件,获取当前的master地址。我们将内置的salt架构信息通过pillar的方式下发给客户端,客户端就可以根据这些信息决定是否要设置这个标记
  2. 在reactor对应的入口文件进行判别,根据获取data中的标记,来决定是进行处理,还是直接忽略整个处理流

具体步骤:

添加pillar文件,告诉所有minion master地址

/srv/pillar/salt_info.sls

1
2
salt_info:
core_master: 10.0.0.1

在sls中通过对比自己的master和真实master的地址,决定是否添加syndic处理标记

/srv/salt/somejob.sls

1
2
3
4
5
6
7
somejob/someevent:
event.send:
- data:
{% if salt["config.get"]("master") != salt["pillar.get"]("salt_info:core_master") %}
_syndic_handle: True
{% endif %}
- otherdata: {{ otherdata }}

reactor配置

/etc/salt/master.d/reactor.conf

1
2
3
reactor:
- somejob/someevent:
- /srv/reactors/somejob/someevent.sls

为salt-master和salt-syndic上面打上grains标签

在master上,执行:

1
salt-call grains.append roles salt-master

在sydic上执行

1
salt-call grains.append roles salt-syndic

在打好标记以后,需要确认grains最终被写到了/etc/salt/grains下面,而不是在minion的配置文件里,否则是无效的

编写reactor文件

/srv/reactors/somejob/someevent.sls

1
2
3
4
5
6
{% if not "_syndic_handle" in data["data"] or "salt-syndic" in salt["grains.get"]("roles") %}
IPARENAME:
runner.somejob:
- args:
- otherdata: {{ data["data"]["other_info"] }}
{% endif %}
fightclub

saltstack中的设计模式(二)

发表于 2019-02-07 | 分类于 saltstack

在上一篇文章中,
我用一个了案例展示了如何将外部系统的数据应用引入到salt里面,
以及通过自开发state扩展的方式实现底层逻辑,并用sls实现业务逻辑。

这次,我们将举例说明在涉及与外部系统进行数据交互的时候,如何利用salt对外部系统进行数据更新。

接下来就进入今天的案例

自动维护主机在jumpserver中的数据

jumpserver,就是所谓的跳板机。普通用户在申请好自己的服务器后,可以通过跳板机登陆自己的系统。
跳板机上面实现了很多安全逻辑,用以实现隔离与命令审计等功能。

这次的任务,我们需要做到以下几点:

  • 能够将主机的信息自动的注册进jumpserver,在主机数据变更的时候,也能进行数据更新
  • 在主机删除的时候,jumpserver的数据也能得到删除

首先,我们设想一下各种方案的可行性。我们可以采取如下的方案中的一种:

  1. 使用pillar + py renderer的方式update数据
  2. 自定义一个execution module, 并使用salt调度器周期性执行
  3. 自定义一个state module,把主要的逻辑写在module中,用sls去调用
  4. 使用sls调用写好的外部程序

对于第一种方案,我们已经在上一篇文章中使用过,既然渲染器能执行python代码,也能够直接执行update操作。但是问题在于在pillar的渲染器中,我们没办法动态的引用其他pillar里面的数据。这涉及到salt核心部分的执行逻辑。我们这次需要将aws pillar中获得的数据一部分导入到jumpserver中。因此不能同样的采用pillar渲染器去执行。

剩下的三种方案中,我更倾向在扩展的module中实现底层功能,把业务逻辑放置在sls中。底层module就像乐高积木的每一个单元,
而业务逻辑,就是把这些积木组装在一起。除了可读性以外,对于业务逻辑来说,主要要面对的问题在于经常的会变更,把业务逻辑写在扩展中,
需要将服务器端的module重新同步到客户端才能生效,而sls是实时生效且更轻量级,更适合快速开发的场景。基于这个考虑,最后一个方案更为合适。

接下来,我们可以考虑的实现方式是把对接jumpserver的api写在scripts中, 在sls中使用cmd.run去调用。但是最大的问题在于如何有效的进行安全隔离,让每个minion只能更新自己的数据,不能获得其他minion的数据,这会形成安全隐患。如果jumpserver 的api本身实现不了这些功能,那么可能需要重新做一个api server,
对jumpserver的api进行封装,实现安全的逻辑。这是可行的,虽然比较麻烦。

我们这边文章主要是基于salt现有的组件,所以肯定是以通过salt实现为最优先选择。条条大路通罗马,但是我们只选择最快捷的方式。

最后我决定采用的方式是使用sls + event -> reactor -> runner来实现这个功能

步骤如下:

编写salt runner用来对接jumpserver api

salt runner是在服务端执行的python程序,salt把runner中的某一个函数映射到salt的一组调用,并能够和salt的其他组件联动。
开发者几乎减去了所有不必要的代码,只需要关注业务逻辑即可。

因为涉及到业务逻辑,我将代码隐藏起来,只写了基本的逻辑。

runner的路径需要单独在master配置文件进行配置,需要注意的是,对于不需要使用 fileserver的,也就是通过salt://xx暴露出来的文件,
都不要放在salt fileserver的路径下面,避免出现安全上的隐患。

/srv/_runners/jumpserver.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def add_host(hostname, ipaddress, port=22, other_info=None):
'''
add host to jumpserver
'''
ret = {'result': True, 'retcode': 0, 'comment': ''}
return ret
def get_host(hostname=None, ipaddress=None):
'''
get host from jumpserver
'''
ret = {'result': True, 'retcode': 0, 'comment': ''}
return ret
def remove_host(hostname):
'''
remove host from jumpserver
'''
ret = {'result': True, 'retcode': 0, 'comment': ''}
return ret
def reset_host(ipaddress, other_info=None):
'''
reset jumpserver information
'''
ret = {'result': True, 'retcode': 0, 'comment': ''}
return ret

runner函数写好以后,可以通过salt-run module.function [args]的方式来直接调用写好的runner

编写pillar,让客户端拿到jumpserver服务器的信息

和上一篇文章中用到的方法是一样的。
不过有一个问题特别值得注意,就是在缓存中我们选择了主机的ip地址为主键,这是因为对于我们的场景,
主机名变更的比ip地址更换的更加频繁。

而对于salt来说,我们使用主机名称作为主键。我们对主机名称使用了特别的编码,让主机名称包含有结构化信息。
方便salt进行定位。

主键不同,需要付出很多额外的代码。因此在设计主键时,要全方面的考虑到各种情况。

/srv/pillar/jumpserver/init.sls

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!py
try:
from redis_cache import *
cache = SimpleCache(50000, expire=7200, namespace='jumpserver')
HAS_CACHE = True
except:
HAS_CACHE = False
def run():
ret = {}
ip = [ip for ip in __grains__['ipv4'] if ip.startswith("10.")][0]
try:
ret = cache.get_json(ip)
except:
try:
import salt.runner
runner = salt.runner.RunnerClient(__opts__)
info = runner.cmd(fun='jumpserver.get_host', kwarg={'ipaddress': ip}, print_event=False)
ret = info['data'][0]
if HAS_CACHE:
cache.store_json(ip, ret)
except:
pass
return {'jumpserver': ret}

在代码中,我们直接引用了写好的runner程序获得jumpserver的数据。
我们把缓存的逻辑处理放在pillar里面,get_host函数里面就可以不通过缓存。

通过自定义grains module挖掘minion上的信息

jumpserver需要知道minion的端口号作为其中一个参数。我们可以使用grains,先将这部分数据挖掘出来。
而不必每次都去执行对应的逻辑,如果其他任务也用到这部分数据,也可以从grains中提取

/srv/salt/_grains/sshd.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!/usr/bin/env python
def _sshd_conf(conf_file):
'''
parse sshd config file
'''
sshdconf = {'enable_passwd': True, 'permit_root': True, 'port': 22}
try:
with open(conf_file, 'r') as f:
for l in f:
if l.startswith('Port'):
sshdconf['port'] = int(l.split()[1].lower())
if l.startswith('PasswordAuthentication'):
sshdconf['enable_passwd'] = True if l.split()[1].lower() == 'yes' else False
elif l.startswith('PermitRootLogin'):
sshdconf['permit_root'] = True if l.split()[1].lower() == 'yes' else False
break
except:
return None
return sshdconf
def run():
grains = {}
sshd = _sshd_conf('/etc/ssh/sshd_config')
if sshd:
grains['sshd'] = sshd
return grains

在sls中使用py render实现复杂逻辑

我们在sls中,需要实现的是通过event.send将信息通过salt的事件系统传送到salt-master进行处理。

sls默认的渲染器是jinja + yaml. 这本身可以实现大部分的逻辑功能,但是在某些异常复杂的场景下,依然会显得力不从心。
jinja只是一个模板语言,表达能力有限,用py渲染器反而会简单许多。

/srv/salt/jobs/jumpserver/init.sls

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#!py
def run():
jump = __pillar__.get('jumpserver', {})
aws = __pillar__.get('aws', {})
qcloud = __pillar__.get('qcloud', {})
current_info = {}
other_info = {}
other_info['os'] = __grains__['os']
other_info['os_version'] = __grains__['osrelease']
other_info['os_arch'] = __grains__['osarch']
ip = [ip for ip in __grains__['ipv4'] if ip.startswith('10.')][0]
hostname = __grains__['id']
port = __salt__['grains.get']('sshd:port', 22)
if aws:
ip = aws['PrivateIpAddress']
hostname = aws['Tags']['Name']
other_info['vendor'] = 'aws'
other_info['model'] = aws['InstanceType']
public_ip = aws.get('PublicIpAddress')
if public_ip:
other_info['public_ip'] = public_ip
created_by = aws['Tags'].get('Creator')
if created_by:
other_info['created_by'] = created_by
elif qcloud:
ip = qcloud['PrivateIpAddresses'][0]
hostname = qcloud['InstanceName']
other_info['vendor'] = 'qcloud'
public_ips = qcloud.get('PublicIpAddresses')
if public_ips:
other_info['public_ip'] = public_ips[0]
created_by = qcloud['Tags'].get('Creator')
if created_by:
other_info['created_by'] = created_by
current_info['ipaddress'] = ip
if __salt__['config.get']('master') != __salt__['pillar.get']('salt_info:core_master'):
current_info['_syndic_handle'] = True
if not jump:
current_info['hostname'] = hostname
current_info['port'] = port
current_info['other_info'] = other_info
ret = {'jumpserver/add_host': {'event.send': [{'data': current_info}]}}
return ret
else:
if hostname != jump.get('hostname'):
other_info['hostname'] = hostname
if port != jump.get('port'):
other_info['port'] = port
fix_info = {k:other_info[k] for k in other_info.keys() if other_info[k] != jump[k]}
if fix_info:
current_info['other_info'] = fix_info
ret = {'jumpserver/reset_host': {'event.send': [{'data': current_info}]}}
return ret

我们将grains中的数据,以及从pillar中取出来的数据,同现有的jumpserver的数据进行比较。
因为我们使用的云平台不同,所以获取数据的渠道也有别。aws 的pillar的数据,就是在我们上一篇文章中介绍的做法。
最后根据实际情况,使用jumpserver/add_host或者jumpserver/reset_host。

配置并编辑reactor

salt reactor是一个快捷的事件驱动框架。我们只需要定义一个event和reactor的映射关系,就可以构建一个事件处理系统

reactor配置:

/etc/salt/master.d/reactor.conf

1
2
3
4
5
reator:
- 'jumpserver/add_host':
- /srv/reactor/jumpserver/add_host.sls
- 'jumpserver/reset_host':
- /srv/reactor/jumpserver/reset_host.sls

reactor编写:

/srv/reactor/jumpserver/add_host.sls

1
2
3
4
5
6
7
8
9
10
11
{% if not "_syndic_handle" in data["data"] or "salt-syndic" in salt["grains.get"]("roles") %}
add_host_to_jumpserver:
runner.jumpserver.add_host:
- args:
hostname: {{ data["id"] }}
ipaddress: {{ data["data"]["ipaddress"] }}
port: {{ data["data"]["port"] }}
{% if "other_info" in data["data"] %}
other_info: {{ data["data"]["other_info"] | tojson }}
{% endif %}
{% endif %}

/srv/reactor/jumpserver/reset_host.sls

1
2
3
4
5
6
7
{% if not "_syndic_handle" in data["data"] or "salt-syndic" in salt["grains.get"]("roles") %}
jumpserver_reset_host:
runner.jumpserver.reset_host:
- args:
ipaddress: {{ data["data"]["ipaddress"] }}
other_info: {{ data["data"]["other_info"] | tojson }}
{% endif %}

在reator中,我们将事件直接映射到salt runner。如果有复杂的场景,我们可以把reactor映射到salt的orchestrater系统。
具体用法,可以参考salt官方文档,不再赘述。

这里面有几个问题值得注意一下。 我们用到一个_syndic_handle的参数,这个原因我会再起一篇文章进行描述,暂且不表。
还有一个就是tojson的 jinja过滤器。这是salt2018.3引入的过滤器,用以解决sls不直接支持字典的问题。

处理主机删除后的jumpserver的垃圾清除

这是最后一步工作了。因为主机已经删除了,所以肯定不能通过sls来调用了。
还记得我们在上一篇文章有一个垃圾处理的外部脚本,我们只需要把runner中的remove_host函数附加到主机删除的过程里面就可以了

总结

在本次的案例中,我们采用的技术方案有

  • 使用salt runner作为service,快速的搭建一个自动处理服务
  • 使用在sls中使用py renderer来实现比较更加复杂的逻辑
  • 使用自定义grains,将客户端数据采集到salt中,供salt其他任务使用
  • 使用reactor解决安全问题,每个发出的事件,都由salt认证主机id。不过salt event系统中,并不包含minion的ip地址,因此没办法直接认证ip地址。需要对salt event进行一定的改良
fightclub

saltstack中的设计模式(一)

发表于 2019-02-06 | 分类于 saltstack

很久没做笔记了。新年期间时间充裕,可以重新开工了。
因为工作的需要,接触到saltstack。也围绕着saltstack做了很多的工作。
stackstack不仅是一个自动化工具,也是一个自动化任务的开发框架。

相较于ansible, chef等自动化工具,salt也有其独到的特点:

  • 纯python开发,插件式的架构让pythoner很容易的进行扩展
  • 分布式架构,节点minion自动连接master,不依赖于ssh,非常适合主机自动注册的场景。并且方便在客户端执行独立的逻辑
  • 对windows的支持很好

而标题中所谓的设计模式,指的式在实现具体的功能任务之外,还需要考虑以下的因素:

  • 最大化代码的复用
  • 减少不必要的调用,增加程序性能
  • 增加代码的可读性,方便团队协作
  • 降低可能出现人为失误的情况,缩小人为失误因素所带来的影响

在这个系列中,我会用一些实际的案例分享saltstack中的任务设计经验。
废话不多说,下面就是我们第一个案例

aws标签管理

在这个案例中,我们的任务是对aws主机的标签信息进行检测,并关联于salt中的其他自动化任务。

我们的需要面临的问题主要有:

  • 如何获取aws的信息,并将这部分信息应用于saltstack的其他任务
  • 如何编写标签检测的任务,并嵌入到其他业务逻辑中

获取aws主机信息

salt主要有两种存储变量的方案:

  • grains,minion根据本地主机信息进行生成
  • pillar,通过master端生成,并按照规则分配给minion,每个minion独享自己的pillar,无法得知其他minion的pillar信息

这个场景下,适配的方案显然是使用pillar。但是默认的pillar是服务端的yaml,如何将外部的动态数据变成pillar,我们有两种选择。

  • 使用pillar extension的方式将外部数据库接入salt系统,并通过一个外部脚本周期性的通过aws api将aws的数据塞进这个外部数据库
  • 使用py renderer的方式将对应的pillar换成python程序

对于第一种方案,优点在于,提供了一个一致性的数据接口,一定程度上简化了设计。但最大的问题在于pillar的获取和刷新之间没有任何联系,实时性不好。在刷新周期新注册的主机无法完成基于这些数据的自动化任务。如果需要拥有一定的实时性,通过事件驱动的方式触发数据更新。比如采用aws lambda或者是salt reactor拦截主机进入的event。实现起来都比较复杂。同时,因为事件系统本身是异步的,很难做到和依赖的任务进行同步,想要达到理想的效果实现起来十分复杂。但是如果这些问题都由外部的cmdb来解决,那无疑是一个更好的办法。

对于第二种方案,能够保证实时性,但是最大的问题在于调用次数过度频繁。这个问题可以通过在代码中引入一层缓存来解决。

平衡利弊,第二种方案更合适一些。

在pillar中存储aws信息
安装依赖服务与相关第三方库

在saltmaster上安装redis,以及对应的缓存库。我找到了一个第三方的redis缓存库来做这个工作,减少代码量。
如果希望采用别的方式,可以自己来实现缓存逻辑,也是比较简单的。

1
2
3
yum install redis -y
systemctl enable redis && systemctl start redis
pip install redis-simple-cache boto3
实现代码

/srv/pillar/aws/init.sls

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#!py
# author: thuhak.zhou@nio.com
from collections import Iterable
import boto3
import salt.config
from redis_cache import *
from ipaddress import ip_address, ip_network
saltconfig = salt.config.master_config('/etc/salt/master')
def _encode_all(items, ignore_type=(unicode,str)):
if isinstance(items,dict):
new_item = {}
for k in items:
new_item[k.encode('ascii')] = _encode_all(items[k])
return new_item
elif isinstance(items, Iterable) and not isinstance(items, ignore_type):
new_item = []
for x in items:
new_item.append(_encode_all(x))
return new_item
elif isinstance(items,unicode):
return items.encode('ascii')
else:
return items
def _tran_to_json(items):
if isinstance(items,dict):
new_item = {}
for k,v in items.items():
new_item[k] = _tran_to_json(v)
return new_item
elif isinstance(items, list):
new_item = []
for x in items:
new_item.append(_tran_to_json(x))
return new_item
elif isinstance(items, (str, int, float, unicode)):
return items
else:
return str(items)
def _filter_ip(ip_list):
valid_network = saltconfig['ip-range']
for raw_ip in ip_list:
ip = ip_address(raw_ip.decode('ascii'))
for raw_network in valid_network:
network = ip_network(raw_network)
if ip in network:
return raw_ip
def run():
"""get aws attribute for ec2 instance
:returns: aws attribute
"""
try:
cache = SimpleCache(50000, expire=3600, namespace='aws')
HAS_CACHE = True
except:
HAS_CACHE = False
ip = _filter_ip(__salt__["grains.get"]("ipv4"))
if HAS_CACHE:
try:
result = cache.get_json(ip)
return {'aws': result}
except:
pass
filters = [{'Name': 'private-ip-address', 'Values': [ip]}]
creds = _encode_all(saltconfig['aws']['creds'])
result = {}
for cred in creds:
ec2 = boto3.client('ec2', region_name=cred['region_name'],aws_access_key_id=cred['aws_access_key_id'], aws_secret_access_key=cred['aws_secret_access_key'])
try:
result = ec2.describe_instances(Filters=filters)['Reservations'][0]['Instances'][0]
Tags = {}
for t in result.get('Tags'):
Tags[t['Key']] = t['Value']
result['Tags'] = Tags
if HAS_CACHE:
cache.store_json(ip, _tran_to_json(result))
return {'aws': result}
except Exception as e:
continue
return {'aws': result}

我利用了salt.config 将配置同代码进行了分离。

配置文件放置在/etc/salt/master.d/aws.conf下面。

1
2
3
4
5
6
7
8
9
10
11
aws:
creds:
- region_name: cn-north-1
aws_access_key_id: xxx
aws_secret_access_key: xxx
- region_name: cn-north-1
aws_access_key_id: xxx
aws_secret_access_key: xxx
ip-range:
- 10.x.x.x/15
- 10.y.y.y/15

而run函数,则是pillar的主体,返回的dict,就是需要的结果。

而renderer的关键之处,在与我们使用__salt__["grains.get"]("ipv4")获得了minion的ip地址。虽然整个代码是在master端执行的,但是整个执行环境中导入的双下划线变量,却是在minion端执行。因此,我们也可以将客户端的ip地址作为变量,传给渲染器

剩下的代码逻辑就比较简单了,如果在缓存中已经存在信息了,就读取缓存,否则,通过aws的api直接获取信息。这样就可以解决实时性的问题

通过定期脚本更新信息

虽然新加入的主机pillar的实时性的问题得到了解决,但是更新还是需要等缓存过期。如果缓存时间过短,aws api的调用还是会很频繁,缓存时间太长。主机信息的更新变不及时。一个最简单的办法就是周期性进行一个全量的查询,进行缓存更新。同时利用这次全量查询,可以执行一些额外的任务,比如清除已经不在aws中存在的主机key, 代码和pillar的代码差别不大

/opt/scripts/saltcron.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#!/usr/bin/env python
# author: thuhak.zhou@nio.com
'''
refresh redis cache for aws pillar
remove minions which can not be found by aws api
'''
from collections import Iterable
import logging
from logging.handlers import RotatingFileHandler
from ipaddress import ip_address, ip_network
from threading import Thread
from Queue import Queue
import boto3
import salt.config
import salt.runner
import salt.client
import salt.wheel
from redis_cache import *
logger = logging.getLogger('aws-cache')
logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s [%(levelname)s]: %(message)s', "%Y-%m-%d %H:%M:%S")
loghandler1 = RotatingFileHandler('/var/log/aws/saltcron', maxBytes=10*1024*1024, backupCount=7)
loghandler2 = logging.StreamHandler()
loghandler1.setFormatter(formatter)
loghandler2.setFormatter(formatter)
logger.addHandler(loghandler1)
logger.addHandler(loghandler2)
saltconfig = salt.config.master_config('/etc/salt/master')
wheels = salt.wheel.WheelClient(saltconfig)
runner = salt.runner.RunnerClient(saltconfig)
caller = salt.client.Caller()
def _encode_all(items, ignore_type=(unicode,str)):
if isinstance(items,dict):
new_item = {}
for k in items:
new_item[k.encode('ascii')] = _encode_all(items[k])
return new_item
elif isinstance(items, Iterable) and not isinstance(items, ignore_type):
new_item = []
for x in items:
new_item.append(_encode_all(x))
return new_item
elif isinstance(items,unicode):
return items.encode('ascii')
else:
return items
def _tran_to_json(items):
if isinstance(items,dict):
new_item = {}
for k,v in items.items():
new_item[k] = _tran_to_json(v)
return new_item
elif isinstance(items, list):
new_item = []
for x in items:
new_item.append(_tran_to_json(x))
return new_item
elif isinstance(items, (str, int, float, unicode)):
return items
else:
return str(items)
def _check_ip(ip):
try:
ip_o = ip_address(ip.decode('ascii'))
except:
return False
valid_network = saltconfig['ip-range']
for raw_network in valid_network:
network = ip_network(raw_network)
if ip_o in network:
return True
return False
def all_instance():
creds = _encode_all(saltconfig['aws']['creds'])
for cred in creds:
ec2 = boto3.client('ec2', region_name=cred['region_name'],aws_access_key_id=cred['aws_access_key_id'], aws_secret_access_key=cred['aws_secret_access_key'])
result = ec2.describe_instances()['Reservations']
for i in result:
instances = i['Instances']
for instance in instances:
Tags = {}
old_tag = instance.get('Tags')
if not old_tag:
logger.warning('find ec2 instance without tag')
logger.debug(str(instance))
else:
for t in old_tag:
Tags[t['Key']] = t['Value']
instance['Tags'] = Tags
yield _tran_to_json(instance)
def GetServerList():
'''
get dead minions
'''
minions = runner.cmd('manage.down', print_event=False)
return minions
def DelServer(serName):
'''
delete server keys
'''
logger.info('removing minion {} in salt'.format(serName))
wheels.cmd_async({'fun': 'key.delete', 'match': serName})
if __name__ == '__main__':
cache = SimpleCache(50000, expire=3600, namespace='aws')
queue = Queue()
t = Thread(target=lambda q: q.put(GetServerList()), args=(queue,))
t.start()
all_ec2 = set()
logger.info('start freshing cache for aws ec2')
for instance in all_instance():
try:
interfaces = instance.get('NetworkInterfaces')
if not interfaces:
logger.debug('instance without interface')
logger.debug(str(instance))
continue
ip = interfaces[0]['PrivateIpAddress']
if not _check_ip(ip):
logger.warning('{} is not valid private ip'.format(ip))
continue
name = instance['Tags'].get('Name')
if not name:
logger.warning('find ec2 {} without Name Tag'.format(ip))
else:
all_ec2.add(name)
logger.debug('refresh cache for {}'.format(ip))
cache.store_json(ip, instance)
except Exception as e:
logger.error('some error happen for instance {}'.format(instance))
continue
t.join()
minions = queue.get()
for minion in minions:
if minion not in all_ec2:
DelServer(minion)

如何利用pillar中aws的信息

对于大部分任务,我们可以使用salt推荐的jinja模版渲染的方式利用pillar里面的值并进行逻辑控制。
或者是通过state的onlyif,unless等参数进行约束,这种方式可以应用于绝大部分场景。

但是对于这个案例,我们需要对tag进行一些正则匹配,然后用正则匹配的结果反复应用于各种任务。
因此,我们最好一次性将所有的检测条件写好,并在其他的sls中引用这个写好的检测条件。

于是,我又再换一种设计模式

添加正则匹配的state扩展

正则匹配校验是一个同业务无关非常底层的需求。因此我将这部分功能做成state扩展,最大化代码的可重用性。

/srv/salt/_states/valuecheck.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#!/usr/bin/env python
__virtual__ = 'valuecheck'
def check(name, rule=None, ignorecase=False, dotall=False):
'''
check if value match rule, if not specify rule, check if value exists.
'''
import re
ret = {'name': name, 'changes': {}, 'result': True, 'comment': 'value check pass'}
if rule is None:
if not name:
ret['result'] = False
ret['comment'] = 'value not match'
else:
iflag = re.IGNORECASE if ignorecase else 0
dflag = re.DOTALL if dotall else 0
flags = iflag | dflag
if not re.match(rule, name, flags=flags):
ret['result'] = False
ret['comment'] = 'value not match'
return ret
将校验逻辑写成一个独立的sls

校验逻辑已经属于业务层的代码了,因此放在sls中更为简单且易于阅读。

/srv/salt/system/checks/checktag.sls

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
############################
# check if aws tag valid #
############################
{% set tags = salt["pillar.get"]("aws:Tags", {}) %}
tag_creator:
valuecheck.check:
- name: {{ tags.get("Creator", "") }}
- rule: .+
tag_owner:
valuecheck.check:
- name: {{ tags.get("Owner", "") }}
- rule: ^\w+(\.\w+){1,2}$
tag_department:
valuecheck.check:
- name: {{ tags.get("Department", "") }}
- rule: ^(ab||cd|ef)$
- ignorecase: True
tag_project:
valuecheck.check:
- name: {{ tags.get("Project", "") }}
tag_name:
valuecheck.check:
- name: {{ tags.get("Name", "") }}
- rule: ^[t|d]-aws(cn|nx|bj)-.+\d$
tag_checks:
wrap.wrap:
- require:
- valuecheck: tag_creator
- valuecheck: tag_owner
- valuecheck: tag_project
- valuecheck: tag_department
- valuecheck: tag_name

上面的valuecheck.check就是我们刚刚写好的state扩展。
但是我们如果每次做这组校验,都要写一堆条件,比较麻烦。因此最后的wrap.wrap,就是为了解决这个问题的。

warp的代码如下:

/srv/salt/_states/wrap.py

1
2
3
4
5
6
7
8
9
def wrap(name, changes=None, result=True, comment=''):
if changes is None:
changes = dict()
ret = {'name': name,
'changes': changes,
'result': result,
'comment': comment
}
return ret

如代码所见,这个函数的目的是在满足state模块接口的情况下,实现一个极为简单空逻辑,通过传入参数的不同,直接决定state的结果。我们利用这个不执行任何有意义逻辑的任务,附加上require,就可以实现简化的目的了

在业务逻辑中使用校验逻辑

终于到最后一步了。我祛除了具体的业务逻辑,代之以一个简单的例子。

somejob.sls

1
2
3
4
5
6
7
8
include:
- system.checks.checktag
somejob:
job.job:
- args
- require:
- wrap: tag_checks

在这个任务的sls中,我们通过include引用刚刚写好标签检测任务。并将这个检测点附加在对应的执行节点上。如果我们有很多任务都依赖于这个检测,那么通过highstate调用的检测只会执行一次,不会重复执行。

总结

最后针对这个案例,我们进行一下技术总结:

  • 可以通过pillar + renderer 的方式从外部系统导入数据到salt中,但需要使用缓存来降调用次数
  • 通过state扩展的方式,实现一些控制逻辑,这样代码会比使用jinja渲染更为清晰,同时避免执行顺序不通导致的逻辑错误
  • 将真实的业务逻辑以及基础代码分离解耦,让业务逻辑更为清晰且易于维护
fightclub

一次zabbix前端优化调整

发表于 2016-12-30 | 分类于 数据库

最近经常发现zabbix的dashbord的最近20个问题以及系统状态的刷新偶尔会多几秒延时,圈圈转啊转,让人很不爽,于是决定折腾一下。

最开始着手的就是数据库。

系统参数调整

最初的时候我把postgre中的fsync关闭,并且将数据最为庞大的各个history和trends的分区改成了unlogged模式。效果非常明显。

那么下一步就是对一些常规的系统参数进行精细调整,毕竟很多都是简单有效的方法。

文件系统调整

之前不知道看过哪个评测报告,说对于随机读写,ext3是要超过ext4的。虽然不知道真假,但是还是秉着宁可信其有的方式使用了的ext3分区。

对于文件系统最常见的优化就是关闭atime模式和设置更快的日志模式。

开启atime功能意味着在读取数据的时候也会造成inode的刷新,而inode的刷新也会带来文件系统日志的变更。而atime一般来说是没多大作用,关闭atime也不会出现什么不良反应,属于首先开刀的地方。

其次是日志模式。默认的ordered方式即使先写用户数据再写元数据的模式,也就是和windows一样。当然也有更慢的更安全的journal模式和更快却没那么安全的writeback模式。我的做法是把history和trends单独的表空间采取writeback模式,也算是一种折中。当然也可以完全关闭日志,不过这就太过分了。

再次是关闭jdb barrier。按照ext3 manpage的说法:

1
2
3
4
5
6
7
8
9
10
11
12
barrier=0 / barrier=1
This disables / enables the use of write barriers in the jbd code.
barrier=0 disables, barrier=1 enables (default).
This also requires an IO stack which can support barriers,
and if jbd gets an error on a barrier write,
it will disable barriers again with a warning.
Write barriers enforce proper on-disk ordering of journal commits,
making volatile disk write caches safe to use,
at some performance penalty.
If your disks are battery-backed in one way or another,
disabling barriers may safely improve performance.

jdb barrier确保日志的提交顺序。如果磁盘的有电源的,可以关闭以提高性能。
对于使用阵列存储的我来说,这个应该不必担心,关闭即可。

另外,文件系统的块大小应该也是会存在影响的。因为数据库的页面以8K为基本单位,而文件系统默认以4K为基本单位。按照直觉的,应该是修改成一致性应该会好一些。不过没亲测过,就暂缓调整了。

内核调整

对于数据库来讲,内存的调整应该是最为主要的。

为了防止页面换出,我直接禁用了交换分区。把后台的pdflush脏页回写频度调高以优化可能的fsync造成的性能影响。实际上,我应该先用strace统计一下。这个地方可以mark一下,有空再试。

对于postgre文档中建议的共享内存段以及信号量的调整,我现在没看到有什么问题,就没去调整。

最后,我将postgre的shard buffer的内存页面大小修改成2M的大页面。理论上讲,减少页面的数量能增加TLB的命中率,更高效的利用cpu缓存。对于数据库这种需要大块分配内存的应用来讲是有利的。

我的数据库内存是16G,按照官方文档中建议的方法将postgre共享内存设置成所有内存的1/4。也就是4G,其他部分用作操作系统的透明文件缓存。因为还有一些零碎的内存,所以整体需要的大页面数量,会比4G更多一些。

官方文档中的建议的方法是取master进程统计的vm峰值, 即:

1
cat /proc/$(head -1 /data/postmaster.pid)/status | grep -i vmpeak

再除以页面大小,即:

1
grep -i hugepagesize /proc/meminfo

得到的值就是nr_hugepages的值。

另外,官方文档还建议关闭操作系统的透明大页面功能。并将postgre的大页面设置成on,这样就可以让postgre独享大页面了。

我做了一个systemd一次性启动服务来实现这个需求。虽然直接用rc.local的方式更为简单,但是systemd不建议继续使用rc.local,就按照新标准来吧。

1
2
3
4
5
6
#!/usr/bin/env sh
# 禁用透明大页面
echo never > /sys/kernel/mm/transparent_hugepage/enabled
echo never > /sys/kernel/mm/transparent_hugepage/defrag
1
2
3
4
5
6
7
8
9
10
11
12
13
[Unit]
Description = Disable transparent huge page
ConditionPathExists = /sys/kernel/mm/transparent_hugepage/enabled
ConditionPathExists = /sys/kernel/mm/transparent_hugepage/defrag
Before = postgresql.service
[Service]
ExecStart = /usr/sbin/disable_tran_huge_page.sh
Type = oneshot
RemainAfterExit = yes
[Install]
WantedBy=multi-user.target

表索引调整

把基础环境调整后,开始进入正题。调整具体的功能还是要从最直接的地方开刀。也就是找到两个功能卡的对应的表和sql。

表很容易猜,而sql除了查找php的源代码,其实还有更简单的方法。

在postgre中,可以使用pg_stat_statements插件,对sql的执行情况进行统计。把统计视图中调用次数最多的10个sql拉出来看看,就能查到是哪个sql有问题了。

1
select * from pg_stat_statements order by calls desc limit 10;

果然,排名第一的就是events表。语句模板为

1
select eventid,source,object.objectid,clock,value,acknowledged,ns from events where eventid = ?

最慢的执行时间为7051ms,简直弱爆了。

这张表的eventid为主键,有个默认的btree索引,但是是非聚集的。

因为可以想象的是,这个搜索的条件带有明显的聚集特性。因此,把整个表改成按照eventid进行聚集索引,是个不错的主意。为了验证这个结论,我在测试服务器上插入了1000w的模拟数据并且用pgbench进行测试,1000w行的数据在集中搜索100行数据的情况下聚集索引大概快了20%,效果还是挺明显的。

postgre中的聚集索引创建方法为

1
cluster events using events_pkey;

注意运行完cluster后要使用analyze进行重新统计,否则规划器会生成非常差劲的执行计划。
另外,需要定期运行cluster命令以更新聚集索引。

尾声

一番折腾,不知道是不是心理作用,感觉似乎快了一些。下一步可以折腾一下php。也许可以换个php 7试试看。

fightclub

拷贝photon镜像源

发表于 2016-12-19 | 分类于 日志

缘起

最近刚给vsphere升级了6.5。在6.5中内置了容器集成(VIC)功能。docker已经热了这么久了,苦于实在是没时间折腾。现在也总算找到借口换换口味。

vmware为了降低在虚拟机中运行容器的损耗,特别定制了一个极简的并且针对vmware优化过的linux发行版-photon os。就连6.5中新的vsphere server appliance也是使用该系统的。

这个系统里面只带了一些最基本的工具和命令,其他工具通过内置了一个yum兼容的包管理器tdnf来从vmware的官方源下载。因为我司的特别情况,内外网不能互联,或者说不能直接联通,只能再想办法。

最初的想法

最开始的想法很简单,在通往外部的临界服务器使用wget将整个静态网站的内容都下载下来。再通过web服务器把内容放出来,把photon下面的软件源的内容都替换成自己的web服务器就可以了。

试了下居然只下载了几个index.html。把robots检查关闭了还是如此。仔细看了下报错信息。原来是对面的源为了防止网页爬虫,使用:代替了/,再使用js在点击的时候将:替换回来。好吧,算你厉害。只能换个办法。

要不使用反向代理?

其实这并不是个好办法,因为公司的环境比较忌讳。但是偶尔试试看总还是可以的。
不过出人意料的还是失败。我几经确认链接无误,但是就是404。我对nginx玩的还不是很6,也许是某个地方出了我不知道的问题。而且nginx的debug级别日志只有error才有,看起来也是没什么作用。

这种时候,只能想要万能的办法,用tshark把数据包截获下来再分析。结果数据拿到手才想起来是https加密数据。只有拥有会话密钥才能够读取数据的内容。问题是怎么拿到nginx转发时候的会话密钥呢?查看其中openssl部分的代码也许是个主意,不过有点小题大作。事情实在太多,没时间浪费在这个上面。也许自己写爬虫把网站down下来会更快一点。

再换个办法

我之前用过pyquery,就是python版本的jquery。现在想换个scrapy用用。毕竟现成的爬虫框架会更简单一点。虽然有一定的学习曲线,但是说不定以后还能用上。现在唯一的问题就是如何处理js部分。如果图简单,我可以直接进行手工转换,这样的话只要对方每次更换js我就必须跟在后面重写爬虫,有点蠢。或者python直接解析js后在进行爬虫。据我所知,photomjs就可以做这样的事情。我可以通过selenium和phtomjs再结合scrapy来做。看scrapy的文档,似乎可以通过更换download中间件来实现这个。老实说,我特别讨厌框架。因为虽然很少的代码就能解决问题,但是需要太长的学习曲线了解这些东西的结构。如果没有说明手册,感觉简直糟糕至极。只能通过对自己进行心理暗示,这玩意以后说不定什么时候还用的上做安慰。

可是,我明明不是一个搞前端的啊!唉,不管怎么样,先开工吧。

柳暗花明

正当我在浏览器用F12打开开发者工具准备进行页面检查的时候,奇迹发生了。不知道什么原因,js的部分不见了,页面又变回了普通的html。我不知道发生了什么事,也许有我不知道的事情在发生作用。不过这样一来,wget就又能使用了。果然再使用wget,下载就正常了。

经验总结

很多时候,无论生活还是工作,就像这样,充满了不顺利和不确定性。这些不确定性让人为了一点鸡毛蒜皮的破事绞尽脑汁。如果硬要说能从中学到点什么的话,就是再次知道自己既无知,又愚蠢。这么多年过去了,不管又学会了多少东西,这种无力感始终伴随左右。

photon的镜像暂时搭好了,里面居然还包含了gcc编译器。也许我可以把zabbix客户端编译好放到photon里。md,还得去找header。

后续

wget下载好以后,发现有些文件好像没有。单独找一个文件下了一下。发现是跳转到其他下载域名后文件名过长引起的。这也是为什么反向代理404的原因。其实早该想到的,间歇性智商降低。最后的做法还是最简单的把index下下来用pyquery解析后直接传给wget下载,懒得折腾了。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#!/usr/bin/env python3
from pyquery import PyQuery as pq
from glob import glob
from urllib.parse import urljoin
import os
import requests
class NetworkError(Exception):
pass
base = 'https://dl.bintray.com/vmware/'
basepath = '/repo/photon/'
repos = ['photon_release_1.0_x86_64/noarch/','photon_release_1.0_x86_64/x86_64/','photon_updates_1.0_x86_64/noarch/','photon_updates_1.0_x86_64/x86_64/','lightwave/x86_64/','photon_extras/noarch/','photon_extras/x86_64/']
def download(repo):
curdir = os.path.join(basepath,repo)
os.chdir(curdir)
current_rpms = set(glob('*.rpm'))
url = urljoin(base,repo)
print(url)
r = requests.get(url)
if r.status_code != 200:
raise NetworkError
else:
d = pq(r.content)
target = set(d('a').text().split()) - current_rpms
for rpm in target:
if not rpm.endswith('.rpm'):
continue
else:
t = urljoin(url,rpm)
os.system('wget -c -t 0 {} -O {}'.format(t,rpm))
def main():
for repo in repos:
download(repo)
if __name__ == "__main__":
main()

我选择了手工建立原数据,如下:

1
2
3
4
5
6
7
8
9
10
11
12
#!/bin/bash
base=/repo/photon/
repos="/lightwave /photon_extras /photon_release_1.0_x86_64 /photon_updates_1.0_x86_64"
for repo in $repos
do
echo "make repo for photon "${repo}
echo
createrepo -p -d -o ${base}${repo} ${base}${repo}
echo
done
fightclub

将组策略导出成excel

发表于 2016-12-02 | 分类于 脚本

最近有个需求,要把windows组策略导出成excel,这真是无聊的需求。本来可以直接导出html格式还是很好看的。非要做这种无聊的事情。

因为组策略直接的导出是xml或者html格式,相当于nosql的文档类型。而excel则是类似关系数据库的类型,因此需要做些格式转换。

组策略导出

组策略导出使用了powershell,将全部组策略导出到指定路径下。当然也可以指定ldap的路径来进行导出。

ExportGPReport.ps1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
param(
[string]$LdapPath = "All",
[string]$OutPut = $(pwd).Path
)
if($LdapPath -eq "ALL"){
$s=Get-GPO -All
}
else{
$s=(Get-GPInheritance -Target $LdapPath).GpoLinks
}
foreach ($gpo in $s){
$out=$OutPut+"\"+$gpo.DisplayName+".html"
Get-GPOReport -Name $gpo.DisplayName -ReportType Html -Path $out
}

使用方法例如.\ExportGPReport.ps1 -OutPut c:\gpo

Get-GPOReport导出xml格式的文档会损失很多信息,看来只能导出成html再进行解析了。

格式转换

如果利用excel直接进行转换,效果简直惨不忍睹。因此只能自己设想样式进行导出。我先预想使用合并单元格的方式描绘树状图。但是导出的效果非常之烂,因此只能换一种格式。采用高级标题使用更多单元格合并的方式来展现。虽然还是很难看,不过稍微好了点。

不知道怎么用powershell实现,于是又拿起了熟悉的python。
gpo.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
#!/usr/bin/env python3
import os.path
import weakref
from glob import glob
from pyquery import PyQuery as pq
from argparse import ArgumentParser
import xlwt
style = xlwt.XFStyle()
ali = xlwt.Alignment()
borders = xlwt.Borders()
borders.left = xlwt.Borders.THICK
borders.right = xlwt.Borders.THICK
borders.top = xlwt.Borders.THICK
borders.bottom = xlwt.Borders.THICK
ali.horz = xlwt.Alignment.HORZ_CENTER
ali.vert = xlwt.Alignment.VERT_CENTER
ali.wrap = xlwt.Alignment.WRAP_AT_RIGHT
style.alignment = ali
style.borders = borders
class Node:
def __init__(self):
self._parent = None
self.height = 1
self.level = 1
@property
def parent(self):
return self._parent if self._parent is None else self._parent()
@parent.setter
def parent(self,node):
self._parent = weakref.ref(node)
def __repr__(self):
return type(self) + 'height: %s' % self.height
def show(self):
pass
def _grow(self, h):
if h == 0:
return
else:
p = self
p.height += h
if p.parent is not None:
p = p.parent
p._grow(h)
def _dive(self, d):
pass
class Branch(Node):
def __init__(self, name):
super(Branch, self).__init__()
self.name = name
self.children = []
def __str__(self):
s = 'Node:%s,height:%s' % (self.name, self.height)
return s
def show(self):
print(self)
for child in self.children:
child.show()
def add_node(self, node):
width = len(self.children)
self.children.append(node)
node.parent = self
node._dive(self.level)
if width >= 1:
self._grow(node.height)
else:
self._grow(node.height - 1)
def _dive(self, d):
self.level += d
for child in self.children:
child._dive(d)
class Leaf(Node):
def __init__(self):
super(Leaf, self).__init__()
self.table = []
def __str__(self):
return str(self.height)
def show(self):
print(self)
def add_data(self, data):
width = len(data)
if self.table == []:
width -= 1
self.table.extend(data)
self._grow(width)
def _dive(self, d):
self.level += d
def walk(p, cur_node):
for n in p.children():
child = pq(n)
if child.attr('class').startswith('he'):
for g in child.children():
grandchild = pq(g)
if grandchild.attr('class') == 'sectionTitle':
node = Branch(child.text())
cur_node.add_node(node)
cur_node = node
elif grandchild.is_('table'):
leaf = Leaf()
data = []
for t in grandchild('tr'):
td = pq(t).find('td')
if not td.attr('colspan') and td.eq(1).text() != '':
data.append((td.eq(0).text(), td.eq(1).text()))
cur_node.add_node(leaf)
leaf.add_data(data)
elif child.attr('class') == 'container':
walk(child, cur_node)
else:
continue
class GPOTree:
def __init__(self, title, ctree, utree):
self.title = title
self.ctree = ctree
self.utree = utree
@classmethod
def loadfile(cls, filename):
gpo = cls.__new__(cls)
with open(filename, 'rt', encoding='utf-16') as f:
d = pq(''.join(f.readlines()))
gpo.title = d('head').find('title').text().replace(' ', '_')[:31]
p = g = d('div.gposummary')
for i in range(10):
p = p.next()
if p.text() == '计算机配置(已启用)':
c = p.next()
C = Branch('计算机配置')
walk(c, C)
gpo.ctree = C
elif p.text() == '用户配置(已启用)':
u = p.next()
U = Branch('用户配置')
walk(u, U)
gpo.utree = U
break
return gpo
def export_to_excel_tree(self, filename):
def traval(node, ws, x, y, style):
if type(node) is Branch:
ws.write_merge(x, x + node.height - 1, y, y, node.name, style)
# print('write_merge(%s,%s,%s,%s,%s)'%(x,x+node.height-1,y,y,node.name))
y += 1
for child in node.children:
traval(child, ws, x, y, style)
x += child.height
elif type(node) is Leaf:
for i, entry in enumerate(node.table):
ws.write_merge(x + i, x + i, y, y, entry[0], style)
ws.write_merge(x + i, x + i, y + 1, y + 1, entry[1], style)
# print('write_merge(%s,%s,%s,%s,%s)'%(x+i,x+i,y,y,entry))
else:
return
wb = xlwt.Workbook(encoding='utf-8')
ws = wb.add_sheet(self.title)
traval(self.ctree, ws, 0, 0, style)
traval(self.utree, ws, self.ctree.height, self.ctree.height, style)
wb.save(filename)
def export_to_excel_table(self, filename):
header_map = {1: 5, 2: 4, 3: 3, 4: 2, 5: 1}
def traval(node, ws):
nonlocal line
if type(node) is Branch:
ws.write_merge(line, line, 0, header_map.get(
node.level, 1), node.name, style)
line += 2
for child in node.children:
traval(child, ws)
elif type(node) is Leaf:
for i, entry in enumerate(node.table):
ws.write(line + i, 0, entry[0], style)
ws.write(line + i, 1, entry[1], style)
line += node.height + 2
else:
return
wb = xlwt.Workbook(encoding='utf-8')
ws = wb.add_sheet(self.title)
line = 0
traval(self.ctree, ws)
traval(self.utree, ws)
wb.save(filename)
if __name__ == '__main__':
parser = ArgumentParser()
parser.add_argument('-s', '--source', type=str,
required=True, help='组策略导出文件存储位置')
parser.add_argument('-o', '--output', type=str,
required=True, help='组策略文件导出位置')
args = parser.parse_args()
source = args.source
output = args.output
if not os.path.isdir(source):
raise SystemExit('source目录不存在')
if not os.path.isdir(output):
raise SystemExit('output目录不存在')
gpo_files = glob(os.path.join(source, '*.html'))
for f in gpo_files:
gpo = GPOTree.loadfile(f)
o = os.path.join(output, f.replace(
source, output).replace('html', 'xls'))
gpo.export_to_excel_table(o)

使用方法举例:

python3 -s 'c:\gpo' -o e:\grouppolicy

其他问题

  • 本来想着把两个脚本粘起来,后来又想想似乎没什么必要。
  • 原来想着使用单个文件多个表单的方式。结果发现xlutils copy的表单居然没有style。再想想不值得再花时间弄了,就这样吧。
12
thuhak

thuhak

hit me as hard as you can

14 日志
7 分类
12 标签
© 2019 thuhak
由 Hexo 强力驱动
主题 - NexT.Muse