-
Notifications
You must be signed in to change notification settings - Fork 45
Expand file tree
/
Copy pathp4runtime.py
More file actions
356 lines (316 loc) · 13.2 KB
/
p4runtime.py
File metadata and controls
356 lines (316 loc) · 13.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
# Copyright 2019 Barefoot Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from functools import wraps
import google.protobuf.text_format
from google.rpc import status_pb2, code_pb2
import grpc
import logging
import queue
import sys
import threading
from iterators import TimeoutIterator
from typing import NamedTuple
from p4.v1 import p4runtime_pb2
from p4.v1 import p4runtime_pb2_grpc
class P4RuntimeErrorFormatException(Exception):
def __init__(self, message):
super().__init__(message)
# Used to iterate over the p4.Error messages in a gRPC error Status object
class P4RuntimeErrorIterator:
def __init__(self, grpc_error):
assert(grpc_error.code() == grpc.StatusCode.UNKNOWN)
self.grpc_error = grpc_error
error = None
# The gRPC Python package does not have a convenient way to access the
# binary details for the error: they are treated as trailing metadata.
for meta in self.grpc_error.trailing_metadata():
if meta[0] == "grpc-status-details-bin":
error = status_pb2.Status()
error.ParseFromString(meta[1])
break
if error is None:
raise P4RuntimeErrorFormatException("No binary details field")
if len(error.details) == 0:
raise P4RuntimeErrorFormatException(
"Binary details field has empty Any details repeated field")
self.errors = error.details
self.idx = 0
def __iter__(self):
return self
def __next__(self):
while self.idx < len(self.errors):
p4_error = p4runtime_pb2.Error()
one_error_any = self.errors[self.idx]
if not one_error_any.Unpack(p4_error):
raise P4RuntimeErrorFormatException(
"Cannot convert Any message to p4.Error")
if p4_error.canonical_code == code_pb2.OK:
continue
v = self.idx, p4_error
self.idx += 1
return v
raise StopIteration
# P4Runtime uses a 3-level message in case of an error during the processing of
# a write batch. This means that if we do not wrap the grpc.RpcError inside a
# custom exception, we can end-up with a non-helpful exception message in case
# of failure as only the first level will be printed. In this custom exception
# class, we extract the nested error message (one for each operation included in
# the batch) in order to print error code + user-facing message. See P4 Runtime
# documentation for more details on error-reporting.
class P4RuntimeWriteException(Exception):
def __init__(self, grpc_error):
assert(grpc_error.code() == grpc.StatusCode.UNKNOWN)
super().__init__()
self.errors = []
try:
error_iterator = P4RuntimeErrorIterator(grpc_error)
for error_tuple in error_iterator:
self.errors.append(error_tuple)
except P4RuntimeErrorFormatException:
raise # just propagate exception for now
def __str__(self):
message = "Error(s) during Write:\n"
for idx, p4_error in self.errors:
code_name = code_pb2._CODE.values_by_number[
p4_error.canonical_code].name
message += "\t* At index {}: {}, '{}'\n".format(
idx, code_name, p4_error.message)
return message
class P4RuntimeException(Exception):
def __init__(self, grpc_error):
super().__init__()
self.grpc_error = grpc_error
def __str__(self):
message = "P4Runtime RPC error ({}): {}".format(
self.grpc_error.code().name, self.grpc_error.details())
return message
def parse_p4runtime_write_error(f):
@wraps(f)
def handle(*args, **kwargs):
try:
return f(*args, **kwargs)
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.UNKNOWN:
raise e
raise P4RuntimeWriteException(e) from None
return handle
def parse_p4runtime_error(f):
@wraps(f)
def handle(*args, **kwargs):
try:
return f(*args, **kwargs)
except grpc.RpcError as e:
raise P4RuntimeException(e) from None
return handle
class SSLOptions(NamedTuple):
insecure: bool
cacert: str = None
cert: str = None
key: str = None
def read_pem_file(path):
try:
with open(path, 'rb') as f:
return f.read()
except Exception:
logging.critical("Cannot read from PEM file '{}'".format(path))
sys.exit(1)
class P4RuntimeClient:
def __init__(self, device_id, grpc_addr, election_id, role_name=None, ssl_options=None):
self.device_id = device_id
self.election_id = election_id
self.role_name = role_name
if ssl_options is None:
self.ssl_options = SSLOptions(True)
else:
self.ssl_options = ssl_options
logging.debug("Connecting to device {} at {}".format(device_id, grpc_addr))
if self.ssl_options.insecure:
try:
logging.debug("Using insecure channel")
self.channel = grpc.insecure_channel(grpc_addr)
except Exception:
logging.critical("Failed to connect to P4Runtime server")
sys.exit(1)
else:
# root certificates are retrieved from a default location chosen by gRPC runtime unless
# the user provides custom certificates.
root_certificates = None
if self.ssl_options.cacert is not None:
root_certificates = read_pem_file(self.ssl_options.cacert)
certificate_chain = None
if self.ssl_options.cert is not None:
certificate_chain = read_pem_file(self.ssl_options.cert)
private_key = None
if self.ssl_options.key is not None:
private_key = read_pem_file(self.ssl_options.key)
creds = grpc.ssl_channel_credentials(root_certificates, private_key, certificate_chain)
try:
self.channel = grpc.secure_channel(grpc_addr, creds)
except Exception:
logging.critical("Failed to connect to P4Runtime server")
sys.exit(1)
self.stub = p4runtime_pb2_grpc.P4RuntimeStub(self.channel)
self.set_up_stream()
def set_up_stream(self):
self.signal_q = queue.Queue()
self.stream_out_q = queue.Queue()
# queues for different messages
self.stream_in_q = {
"arbitration": queue.Queue(),
"packet": queue.Queue(),
"digest": queue.Queue(),
"idle_timeout_notification": queue.Queue(),
"unknown": queue.Queue(),
}
def stream_req_iterator():
while True:
p = self.stream_out_q.get()
if p is None:
break
yield p
def stream_recv_wrapper(stream):
@parse_p4runtime_error
def stream_recv():
iterator = TimeoutIterator(stream, timeout=1)
for p in iterator:
if not self.signal_q.empty():
break
if p == iterator.get_sentinel():
continue
if p.HasField("arbitration"):
self.stream_in_q["arbitration"].put(p)
elif p.HasField("packet"):
self.stream_in_q["packet"].put(p)
elif p.HasField("digest"):
self.stream_in_q["digest"].put(p)
elif p.HasField("idle_timeout_notification"):
self.stream_in_q["idle_timeout_notification"].put(p)
else:
self.stream_in_q["unknown"].put(p)
try:
stream_recv()
except P4RuntimeException as e:
logging.critical("StreamChannel error, closing stream")
logging.critical(e)
for k in self.stream_in_q:
self.stream_in_q[k].put(None)
self.stream = self.stub.StreamChannel(stream_req_iterator())
self.stream_recv_thread = threading.Thread(
target=stream_recv_wrapper, args=(self.stream,))
self.stream_recv_thread.start()
self.handshake()
def handshake(self):
req = p4runtime_pb2.StreamMessageRequest()
arbitration = req.arbitration
arbitration.device_id = self.device_id
election_id = arbitration.election_id
election_id.high = self.election_id[0]
election_id.low = self.election_id[1]
if self.role_name is not None:
arbitration.role.name = self.role_name
self.stream_out_q.put(req)
rep = self.get_stream_packet("arbitration", timeout=2)
if rep is None:
logging.critical("Failed to establish session with server")
sys.exit(1)
is_primary = (rep.arbitration.status.code == code_pb2.OK)
logging.debug("Session established, client is '{}'".format(
'primary' if is_primary else 'backup'))
if not is_primary:
print("You are not the primary client, you only have read access to the server")
def get_stream_packet(self, type_, timeout=1):
if type_ not in self.stream_in_q:
print("Unknown stream type '{}'".format(type_))
return None
try:
msg = self.stream_in_q[type_].get(timeout=timeout)
return msg
except queue.Empty: # timeout expired
return None
@parse_p4runtime_error
def get_p4info(self):
logging.debug("Retrieving P4Info file")
req = p4runtime_pb2.GetForwardingPipelineConfigRequest()
req.device_id = self.device_id
req.response_type = p4runtime_pb2.GetForwardingPipelineConfigRequest.P4INFO_AND_COOKIE
rep = self.stub.GetForwardingPipelineConfig(req)
return rep.config.p4info
@parse_p4runtime_error
def set_fwd_pipe_config(self, p4info_path, bin_path):
logging.debug("Setting forwarding pipeline config")
req = p4runtime_pb2.SetForwardingPipelineConfigRequest()
req.device_id = self.device_id
if self.role_name is not None:
req.role = self.role_name
election_id = req.election_id
election_id.high = self.election_id[0]
election_id.low = self.election_id[1]
req.action = p4runtime_pb2.SetForwardingPipelineConfigRequest.VERIFY_AND_COMMIT
with open(p4info_path, 'r') as f1:
with open(bin_path, 'rb') as f2:
try:
google.protobuf.text_format.Merge(f1.read(), req.config.p4info)
except google.protobuf.text_format.ParseError:
logging.error("Error when parsing P4Info")
raise
req.config.p4_device_config = f2.read()
return self.stub.SetForwardingPipelineConfig(req)
def tear_down(self):
if self.stream_out_q:
logging.debug("Cleaning up stream")
self.stream_out_q.put(None)
if self.stream_in_q:
for k in self.stream_in_q:
self.stream_in_q[k].put(None)
if self.stream_recv_thread:
self.signal_q.put(None)
self.stream_recv_thread.join()
self.channel.close()
del self.channel # avoid a race condition if channel deleted when process terminates
@parse_p4runtime_write_error
def write(self, req):
req.device_id = self.device_id
if self.role_name is not None:
req.role = self.role_name
election_id = req.election_id
election_id.high = self.election_id[0]
election_id.low = self.election_id[1]
return self.stub.Write(req)
@parse_p4runtime_write_error
def write_update(self, update):
req = p4runtime_pb2.WriteRequest()
req.device_id = self.device_id
if self.role_name is not None:
req.role = self.role_name
election_id = req.election_id
election_id.high = self.election_id[0]
election_id.low = self.election_id[1]
req.updates.extend([update])
return self.stub.Write(req)
# Decorator is useless here: in case of server error, the exception is raised during the
# iteration (when next() is called).
@parse_p4runtime_error
def read_one(self, entity):
req = p4runtime_pb2.ReadRequest()
if self.role_name is not None:
req.role = self.role_name
req.device_id = self.device_id
req.entities.extend([entity])
return self.stub.Read(req)
@parse_p4runtime_error
def api_version(self):
req = p4runtime_pb2.CapabilitiesRequest()
rep = self.stub.Capabilities(req)
return rep.p4runtime_api_version