文章

apollo_util

授权指定用户,统一指定权限

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
#! /usr/bin/python
import json

import requests
import logging
import argparse

# 授权token

# captain_token = ("******", "token")

apollo_user = "apollo"
apollo_password = "******"
base_url = "https://***.abc.com"

user_list = [
    ("156****7608", "EchoCourage"),
    ("177****3319", "zhangSan")
]
headers = {
    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
    'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
    'Cache-Control': 'max-age=0',
    'Connection': 'keep-alive',
    'Content-Type': 'application/x-www-form-urlencoded',
    'Origin': base_url,
    'Referer': base_url,
    'Upgrade-Insecure-Requests': '1',
    'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.34'

}

login_data = {
    'username': apollo_user,
    'password': apollo_password,
    'login-submit': '登 录'
}

LOG_FORMAT = "%(asctime)s - %(levelname)s - %(lineno)d - %(message)s"
DATE_FORMAT = "%Y%m/%d %H:%M:%S"
LOG_LEVE = logging.INFO
logging.basicConfig(level=LOG_LEVE, format=LOG_FORMAT, datefmt=DATE_FORMAT)


class ApolloUtil:
    def __init__(self, env="PRO") -> None:
        self.session = requests.session()
        self.session.headers = headers
        self.base_url = base_url.strip().strip("/")
        self.env = env
        self._login()

    def _login(self):
        _ = self.session.post(base_url + "/signin", data=login_data)
        if self.session.get(base_url + "/user").status_code != 200:
            logging.error("login failed")
            exit(1)
        logging.info("login success")
        self.session.headers['Content-Type'] = 'application/json'

    def get_app_ns(self, app):
        url = self.base_url + f"/apps/{app}/envs/PRO/clusters/default/namespaces"
        r = self.session.get(url)
        if r.status_code != 200:
            logging.error(r.content)
            return []
        return [x["baseInfo"]["namespaceName"] for x in r.json()]

    def app_create(self, app):
        data = {"appId": app, "name": app, "orgId": "DevCenter",
                "orgName": "研发部", "ownerName": "apollo", "admins": []}
        url = self.base_url + "/apps"
        r = self.session.post(url, json=data)
        if r.status_code != 200:
            if "App already exists." in r.json()["message"]:
                logging.warning(f"app <{app}> 已存在")
                return
            logging.error(r.json()["message"])
            exit(1)
        logging.info(f"app <{app}> 创建成功")

    def app_cluster_create(self, app, cluster):

        data = {"name": cluster, "appId": app}
        url = self.base_url + "/apps/%s/envs/%s/clusters" % (app, self.env)
        r = self.session.post(url, json=data)
        if r.status_code != 200:
            message = r.json()["message"]
            if "already exists" in message:
                logging.warning(message)
                return
            logging.error(message)
            exit(1)
        logging.info(f"app <{app}> cluster <{cluster}> 创建成功")

    def app_manage_by_thirdparty(self, app, token, name=""):
        if not name:
            name = token
        url = self.base_url + f"/consumers/{token}/assign-role?type=AppRole"
        r = self.session.post(url, json={"appId": app})
        if r.status_code != 200:
            logging.error(r.json()["message"])
        logging.info(f"app <{app}>  managerRole 与 三方token <{name}> 关联成功")

    def _app_role_base(self, action, app, ns, user, name=""):
        if action not in ["Modify", "Release", "Manage"]:
            logging.error("action 只支持 Modify,Release, Manage方法")
            exit(1)
        if not name:
            name = user
        url = self.base_url + f"/apps/{app}/namespaces/{ns}/roles/{action}Namespace"
        if action == "Manage":
            url = self.base_url + f"/apps/{app}/roles/Master"
        r = self.session.post(url, data=str(user))
        if r.status_code != 200:
            message = r.json()["message"]
            if "已授权" in message:
                logging.warning(f"app <{app}>-{ns}  {action}权限 与 用户 <{name}> 已关联")
                return
            logging.info(f"app <{app}>-{ns}  {action}权限 与 用户 <{name}> 关联失败,<{message}>")
            exit(1)
        logging.info(f"app <{app}>-{ns}  {action}权限 与 用户 <{name}> 关联成功")

    def app_manage_role(self, app, user, name=""):

        """
        管理权限
        :param app:
        :param user:
        :param name:  user的名字
        :return:
        """
        self._app_role_base("Manage", app, "", user, name)

    def app_ns_role_of_modify(self, app, ns, user, name=""):

        self._app_role_base("Modify", app, ns, user, name)

    def app_ns_role_of_release(self, app, ns, user, name=""):
        self._app_role_base("Release", app, ns, user, name)


def mange_role(apollo_class, app, ns):
    A = apollo_class
    # 修改,发布权限
    for u in user_list:
        A.app_ns_role_of_modify(app, ns, *u)
        A.app_ns_role_of_release(app, ns, *u)
    # readonly
    A.app_ns_role_of_release(app, ns, "readonly", "readonly")



def main(app, ns="", flag=False):

    A = ApolloUtil()
    # 初始化
    A.app_create(app)
    A.app_cluster_create(app, "pre")
    A.app_cluster_create(app, "gray")
    A.app_manage_by_thirdparty(app, *captain_token)
    # 管理权限
    for u in user_list:
        A.app_manage_role(app, *u)
    if flag:
        ns_list = A.get_app_ns(app)
    else:
        ns_list = [ns]
    for _ns in ns_list:
        mange_role(A, app, _ns)



if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='apollo config sync script')
    # parser.add_argument('-c', action='store_true', help="指定则为创建服务,不指定默认不创建")
    parser.add_argument('-svc', required=True, help='服务名字')
    parser.add_argument('-ns', required=False, default="application", help='命名空间')
    parser.add_argument('-all', action='store_true', help='指定则授权所有命名空间')

    args = parser.parse_args()
    main(args.svc, args.ns, args.all)
本文由作者按照 CC BY 4.0 进行授权