fightclub

saltstack中的设计模式(一)

很久没做笔记了。新年期间时间充裕,可以重新开工了。
因为工作的需要,接触到saltstack。也围绕着saltstack做了很多的工作。
stackstack不仅是一个自动化工具,也是一个自动化任务的开发框架。

相较于ansible, chef等自动化工具,salt也有其独到的特点:

  • 纯python开发,插件式的架构让pythoner很容易的进行扩展
  • 分布式架构,节点minion自动连接master,不依赖于ssh,非常适合主机自动注册的场景。并且方便在客户端执行独立的逻辑
  • 对windows的支持很好

而标题中所谓的设计模式,指的式在实现具体的功能任务之外,还需要考虑以下的因素:

  • 最大化代码的复用
  • 减少不必要的调用,增加程序性能
  • 增加代码的可读性,方便团队协作
  • 降低可能出现人为失误的情况,缩小人为失误因素所带来的影响

在这个系列中,我会用一些实际的案例分享saltstack中的任务设计经验。
废话不多说,下面就是我们第一个案例

aws标签管理

在这个案例中,我们的任务是对aws主机的标签信息进行检测,并关联于salt中的其他自动化任务。

我们的需要面临的问题主要有:

  • 如何获取aws的信息,并将这部分信息应用于saltstack的其他任务
  • 如何编写标签检测的任务,并嵌入到其他业务逻辑中

获取aws主机信息

salt主要有两种存储变量的方案:

  • grains,minion根据本地主机信息进行生成
  • pillar,通过master端生成,并按照规则分配给minion,每个minion独享自己的pillar,无法得知其他minion的pillar信息

这个场景下,适配的方案显然是使用pillar。但是默认的pillar是服务端的yaml,如何将外部的动态数据变成pillar,我们有两种选择。

  • 使用pillar extension的方式将外部数据库接入salt系统,并通过一个外部脚本周期性的通过aws api将aws的数据塞进这个外部数据库
  • 使用py renderer的方式将对应的pillar换成python程序

对于第一种方案,优点在于,提供了一个一致性的数据接口,一定程度上简化了设计。但最大的问题在于pillar的获取和刷新之间没有任何联系,实时性不好。在刷新周期新注册的主机无法完成基于这些数据的自动化任务。如果需要拥有一定的实时性,通过事件驱动的方式触发数据更新。比如采用aws lambda或者是salt reactor拦截主机进入的event。实现起来都比较复杂。同时,因为事件系统本身是异步的,很难做到和依赖的任务进行同步,想要达到理想的效果实现起来十分复杂。但是如果这些问题都由外部的cmdb来解决,那无疑是一个更好的办法。

对于第二种方案,能够保证实时性,但是最大的问题在于调用次数过度频繁。这个问题可以通过在代码中引入一层缓存来解决。

平衡利弊,第二种方案更合适一些。

在pillar中存储aws信息
安装依赖服务与相关第三方库

在saltmaster上安装redis,以及对应的缓存库。我找到了一个第三方的redis缓存库来做这个工作,减少代码量。
如果希望采用别的方式,可以自己来实现缓存逻辑,也是比较简单的。

1
2
3
yum install redis -y
systemctl enable redis && systemctl start redis
pip install redis-simple-cache boto3
实现代码

/srv/pillar/aws/init.sls

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#!py
# author: thuhak.zhou@nio.com
from collections import Iterable
import boto3
import salt.config
from redis_cache import *
from ipaddress import ip_address, ip_network
saltconfig = salt.config.master_config('/etc/salt/master')
def _encode_all(items, ignore_type=(unicode,str)):
if isinstance(items,dict):
new_item = {}
for k in items:
new_item[k.encode('ascii')] = _encode_all(items[k])
return new_item
elif isinstance(items, Iterable) and not isinstance(items, ignore_type):
new_item = []
for x in items:
new_item.append(_encode_all(x))
return new_item
elif isinstance(items,unicode):
return items.encode('ascii')
else:
return items
def _tran_to_json(items):
if isinstance(items,dict):
new_item = {}
for k,v in items.items():
new_item[k] = _tran_to_json(v)
return new_item
elif isinstance(items, list):
new_item = []
for x in items:
new_item.append(_tran_to_json(x))
return new_item
elif isinstance(items, (str, int, float, unicode)):
return items
else:
return str(items)
def _filter_ip(ip_list):
valid_network = saltconfig['ip-range']
for raw_ip in ip_list:
ip = ip_address(raw_ip.decode('ascii'))
for raw_network in valid_network:
network = ip_network(raw_network)
if ip in network:
return raw_ip
def run():
"""get aws attribute for ec2 instance
:returns: aws attribute
"""
try:
cache = SimpleCache(50000, expire=3600, namespace='aws')
HAS_CACHE = True
except:
HAS_CACHE = False
ip = _filter_ip(__salt__["grains.get"]("ipv4"))
if HAS_CACHE:
try:
result = cache.get_json(ip)
return {'aws': result}
except:
pass
filters = [{'Name': 'private-ip-address', 'Values': [ip]}]
creds = _encode_all(saltconfig['aws']['creds'])
result = {}
for cred in creds:
ec2 = boto3.client('ec2', region_name=cred['region_name'],aws_access_key_id=cred['aws_access_key_id'], aws_secret_access_key=cred['aws_secret_access_key'])
try:
result = ec2.describe_instances(Filters=filters)['Reservations'][0]['Instances'][0]
Tags = {}
for t in result.get('Tags'):
Tags[t['Key']] = t['Value']
result['Tags'] = Tags
if HAS_CACHE:
cache.store_json(ip, _tran_to_json(result))
return {'aws': result}
except Exception as e:
continue
return {'aws': result}

我利用了salt.config 将配置同代码进行了分离。

配置文件放置在/etc/salt/master.d/aws.conf下面。

1
2
3
4
5
6
7
8
9
10
11
aws:
creds:
- region_name: cn-north-1
aws_access_key_id: xxx
aws_secret_access_key: xxx
- region_name: cn-north-1
aws_access_key_id: xxx
aws_secret_access_key: xxx
ip-range:
- 10.x.x.x/15
- 10.y.y.y/15

而run函数,则是pillar的主体,返回的dict,就是需要的结果。

而renderer的关键之处,在与我们使用__salt__["grains.get"]("ipv4")获得了minion的ip地址。虽然整个代码是在master端执行的,但是整个执行环境中导入的双下划线变量,却是在minion端执行。因此,我们也可以将客户端的ip地址作为变量,传给渲染器

剩下的代码逻辑就比较简单了,如果在缓存中已经存在信息了,就读取缓存,否则,通过aws的api直接获取信息。这样就可以解决实时性的问题

通过定期脚本更新信息

虽然新加入的主机pillar的实时性的问题得到了解决,但是更新还是需要等缓存过期。如果缓存时间过短,aws api的调用还是会很频繁,缓存时间太长。主机信息的更新变不及时。一个最简单的办法就是周期性进行一个全量的查询,进行缓存更新。同时利用这次全量查询,可以执行一些额外的任务,比如清除已经不在aws中存在的主机key, 代码和pillar的代码差别不大

/opt/scripts/saltcron.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#!/usr/bin/env python
# author: thuhak.zhou@nio.com
'''
refresh redis cache for aws pillar
remove minions which can not be found by aws api
'''
from collections import Iterable
import logging
from logging.handlers import RotatingFileHandler
from ipaddress import ip_address, ip_network
from threading import Thread
from Queue import Queue
import boto3
import salt.config
import salt.runner
import salt.client
import salt.wheel
from redis_cache import *
logger = logging.getLogger('aws-cache')
logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s [%(levelname)s]: %(message)s', "%Y-%m-%d %H:%M:%S")
loghandler1 = RotatingFileHandler('/var/log/aws/saltcron', maxBytes=10*1024*1024, backupCount=7)
loghandler2 = logging.StreamHandler()
loghandler1.setFormatter(formatter)
loghandler2.setFormatter(formatter)
logger.addHandler(loghandler1)
logger.addHandler(loghandler2)
saltconfig = salt.config.master_config('/etc/salt/master')
wheels = salt.wheel.WheelClient(saltconfig)
runner = salt.runner.RunnerClient(saltconfig)
caller = salt.client.Caller()
def _encode_all(items, ignore_type=(unicode,str)):
if isinstance(items,dict):
new_item = {}
for k in items:
new_item[k.encode('ascii')] = _encode_all(items[k])
return new_item
elif isinstance(items, Iterable) and not isinstance(items, ignore_type):
new_item = []
for x in items:
new_item.append(_encode_all(x))
return new_item
elif isinstance(items,unicode):
return items.encode('ascii')
else:
return items
def _tran_to_json(items):
if isinstance(items,dict):
new_item = {}
for k,v in items.items():
new_item[k] = _tran_to_json(v)
return new_item
elif isinstance(items, list):
new_item = []
for x in items:
new_item.append(_tran_to_json(x))
return new_item
elif isinstance(items, (str, int, float, unicode)):
return items
else:
return str(items)
def _check_ip(ip):
try:
ip_o = ip_address(ip.decode('ascii'))
except:
return False
valid_network = saltconfig['ip-range']
for raw_network in valid_network:
network = ip_network(raw_network)
if ip_o in network:
return True
return False
def all_instance():
creds = _encode_all(saltconfig['aws']['creds'])
for cred in creds:
ec2 = boto3.client('ec2', region_name=cred['region_name'],aws_access_key_id=cred['aws_access_key_id'], aws_secret_access_key=cred['aws_secret_access_key'])
result = ec2.describe_instances()['Reservations']
for i in result:
instances = i['Instances']
for instance in instances:
Tags = {}
old_tag = instance.get('Tags')
if not old_tag:
logger.warning('find ec2 instance without tag')
logger.debug(str(instance))
else:
for t in old_tag:
Tags[t['Key']] = t['Value']
instance['Tags'] = Tags
yield _tran_to_json(instance)
def GetServerList():
'''
get dead minions
'''
minions = runner.cmd('manage.down', print_event=False)
return minions
def DelServer(serName):
'''
delete server keys
'''
logger.info('removing minion {} in salt'.format(serName))
wheels.cmd_async({'fun': 'key.delete', 'match': serName})
if __name__ == '__main__':
cache = SimpleCache(50000, expire=3600, namespace='aws')
queue = Queue()
t = Thread(target=lambda q: q.put(GetServerList()), args=(queue,))
t.start()
all_ec2 = set()
logger.info('start freshing cache for aws ec2')
for instance in all_instance():
try:
interfaces = instance.get('NetworkInterfaces')
if not interfaces:
logger.debug('instance without interface')
logger.debug(str(instance))
continue
ip = interfaces[0]['PrivateIpAddress']
if not _check_ip(ip):
logger.warning('{} is not valid private ip'.format(ip))
continue
name = instance['Tags'].get('Name')
if not name:
logger.warning('find ec2 {} without Name Tag'.format(ip))
else:
all_ec2.add(name)
logger.debug('refresh cache for {}'.format(ip))
cache.store_json(ip, instance)
except Exception as e:
logger.error('some error happen for instance {}'.format(instance))
continue
t.join()
minions = queue.get()
for minion in minions:
if minion not in all_ec2:
DelServer(minion)

如何利用pillar中aws的信息

对于大部分任务,我们可以使用salt推荐的jinja模版渲染的方式利用pillar里面的值并进行逻辑控制。
或者是通过state的onlyif,unless等参数进行约束,这种方式可以应用于绝大部分场景。

但是对于这个案例,我们需要对tag进行一些正则匹配,然后用正则匹配的结果反复应用于各种任务。
因此,我们最好一次性将所有的检测条件写好,并在其他的sls中引用这个写好的检测条件。

于是,我又再换一种设计模式

添加正则匹配的state扩展

正则匹配校验是一个同业务无关非常底层的需求。因此我将这部分功能做成state扩展,最大化代码的可重用性。

/srv/salt/_states/valuecheck.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#!/usr/bin/env python
__virtual__ = 'valuecheck'
def check(name, rule=None, ignorecase=False, dotall=False):
'''
check if value match rule, if not specify rule, check if value exists.
'''
import re
ret = {'name': name, 'changes': {}, 'result': True, 'comment': 'value check pass'}
if rule is None:
if not name:
ret['result'] = False
ret['comment'] = 'value not match'
else:
iflag = re.IGNORECASE if ignorecase else 0
dflag = re.DOTALL if dotall else 0
flags = iflag | dflag
if not re.match(rule, name, flags=flags):
ret['result'] = False
ret['comment'] = 'value not match'
return ret
将校验逻辑写成一个独立的sls

校验逻辑已经属于业务层的代码了,因此放在sls中更为简单且易于阅读。

/srv/salt/system/checks/checktag.sls

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
############################
# check if aws tag valid #
############################
{% set tags = salt["pillar.get"]("aws:Tags", {}) %}
tag_creator:
valuecheck.check:
- name: {{ tags.get("Creator", "") }}
- rule: .+
tag_owner:
valuecheck.check:
- name: {{ tags.get("Owner", "") }}
- rule: ^\w+(\.\w+){1,2}$
tag_department:
valuecheck.check:
- name: {{ tags.get("Department", "") }}
- rule: ^(ab||cd|ef)$
- ignorecase: True
tag_project:
valuecheck.check:
- name: {{ tags.get("Project", "") }}
tag_name:
valuecheck.check:
- name: {{ tags.get("Name", "") }}
- rule: ^[t|d]-aws(cn|nx|bj)-.+\d$
tag_checks:
wrap.wrap:
- require:
- valuecheck: tag_creator
- valuecheck: tag_owner
- valuecheck: tag_project
- valuecheck: tag_department
- valuecheck: tag_name

上面的valuecheck.check就是我们刚刚写好的state扩展。
但是我们如果每次做这组校验,都要写一堆条件,比较麻烦。因此最后的wrap.wrap,就是为了解决这个问题的。

warp的代码如下:

/srv/salt/_states/wrap.py

1
2
3
4
5
6
7
8
9
def wrap(name, changes=None, result=True, comment=''):
if changes is None:
changes = dict()
ret = {'name': name,
'changes': changes,
'result': result,
'comment': comment
}
return ret

如代码所见,这个函数的目的是在满足state模块接口的情况下,实现一个极为简单空逻辑,通过传入参数的不同,直接决定state的结果。我们利用这个不执行任何有意义逻辑的任务,附加上require,就可以实现简化的目的了

在业务逻辑中使用校验逻辑

终于到最后一步了。我祛除了具体的业务逻辑,代之以一个简单的例子。

somejob.sls

1
2
3
4
5
6
7
8
include:
- system.checks.checktag
somejob:
job.job:
- args
- require:
- wrap: tag_checks

在这个任务的sls中,我们通过include引用刚刚写好标签检测任务。并将这个检测点附加在对应的执行节点上。如果我们有很多任务都依赖于这个检测,那么通过highstate调用的检测只会执行一次,不会重复执行。

总结

最后针对这个案例,我们进行一下技术总结:

  • 可以通过pillar + renderer 的方式从外部系统导入数据到salt中,但需要使用缓存来降调用次数
  • 通过state扩展的方式,实现一些控制逻辑,这样代码会比使用jinja渲染更为清晰,同时避免执行顺序不通导致的逻辑错误
  • 将真实的业务逻辑以及基础代码分离解耦,让业务逻辑更为清晰且易于维护