forked from openstack/python-openstackclient
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
183 lines (154 loc) · 5.52 KB
/
utils.py
File metadata and controls
183 lines (154 loc) · 5.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from osc_lib import exceptions
from openstackclient.i18n import _
# Transform compute security group rule for display.
def transform_compute_security_group_rule(sg_rule):
info = {}
info.update(sg_rule)
from_port = info.pop('from_port')
to_port = info.pop('to_port')
if isinstance(from_port, int) and isinstance(to_port, int):
port_range = {'port_range': f"{from_port}:{to_port}"}
elif from_port is None and to_port is None:
port_range = {'port_range': ""}
else:
port_range = {'port_range': f"{from_port}:{to_port}"}
info.update(port_range)
if 'cidr' in info['ip_range']:
info['ip_range'] = info['ip_range']['cidr']
else:
info['ip_range'] = ''
if info['ip_protocol'] is None:
info['ip_protocol'] = ''
elif info['ip_protocol'].lower() == 'icmp':
info['port_range'] = ''
group = info.pop('group')
if 'name' in group:
info['remote_security_group'] = group['name']
else:
info['remote_security_group'] = ''
return info
def str2bool(strbool):
if strbool is None:
return None
return strbool.lower() == 'true'
def str2list(strlist):
result = []
if strlist:
result = strlist.split(';')
return result
def str2dict(strdict: str) -> dict[str, str]:
"""Convert key1:value1;key2:value2;... string into dictionary.
:param strdict: string in the form of key1:value1;key2:value2
"""
result: dict[str, str] = {}
if not strdict:
return result
i = 0
kvlist = []
for kv in strdict.split(';'):
if ':' in kv:
kvlist.append(kv)
i += 1
elif i == 0:
msg = _("missing value for key '%s'")
raise exceptions.CommandError(msg % kv)
else:
kvlist[i - 1] = f"{kvlist[i - 1]};{kv}"
for kv in kvlist:
key, sep, value = kv.partition(':')
result[key] = value
return result
def format_security_group_rule_show(obj):
data = transform_compute_security_group_rule(obj)
return zip(*sorted(data.items()))
def format_network_port_range(rule):
# Display port range or ICMP type and code. For example:
# - ICMP type: 'type=3'
# - ICMP type and code: 'type=3:code=0'
# - ICMP code: Not supported
# - Matching port range: '443:443'
# - Different port range: '22:24'
# - Single port: '80:80'
# - No port range: ''
port_range = ''
if is_icmp_protocol(rule['protocol']):
if rule['port_range_min']:
port_range += 'type=' + str(rule['port_range_min'])
if rule['port_range_max']:
port_range += ':code=' + str(rule['port_range_max'])
elif rule['port_range_min'] or rule['port_range_max']:
port_range_min = str(rule['port_range_min'])
port_range_max = str(rule['port_range_max'])
if rule['port_range_min'] is None:
port_range_min = port_range_max
if rule['port_range_max'] is None:
port_range_max = port_range_min
port_range = port_range_min + ':' + port_range_max
return port_range
def format_remote_ip_prefix(rule):
remote_ip_prefix = rule['remote_ip_prefix']
if remote_ip_prefix is None:
ethertype = rule['ether_type']
if ethertype == 'IPv4':
remote_ip_prefix = '0.0.0.0/0'
elif ethertype == 'IPv6':
remote_ip_prefix = '::/0'
return remote_ip_prefix
def convert_ipvx_case(string):
if string.lower() == 'ipv4':
return 'IPv4'
if string.lower() == 'ipv6':
return 'IPv6'
return string
def is_icmp_protocol(protocol):
# NOTE(rtheis): Neutron has deprecated protocol icmpv6.
# However, while the OSC CLI doesn't document the protocol,
# the code must still handle it. In addition, handle both
# protocol names and numbers.
if protocol in ['icmp', 'icmpv6', 'ipv6-icmp', '1', '58']:
return True
else:
return False
def convert_to_lowercase(string):
return string.lower()
def get_protocol(parsed_args, default_protocol='any'):
protocol = default_protocol
if parsed_args.protocol is not None:
protocol = parsed_args.protocol
if hasattr(parsed_args, "proto") and parsed_args.proto is not None:
protocol = parsed_args.proto
if protocol == 'any':
protocol = None
return protocol
def get_ethertype(parsed_args, protocol):
ethertype = 'IPv4'
if parsed_args.ethertype is not None:
ethertype = parsed_args.ethertype
elif is_ipv6_protocol(protocol):
ethertype = 'IPv6'
return ethertype
def is_ipv6_protocol(protocol):
# NOTE(rtheis): Neutron has deprecated protocol icmpv6.
# However, while the OSC CLI doesn't document the protocol,
# the code must still handle it. In addition, handle both
# protocol names and numbers.
if (
protocol is not None
and protocol.startswith('ipv6-')
or protocol in ['icmpv6', '41', '43', '44', '58', '59', '60']
):
return True
else:
return False