GNU Linux-libre 4.14.266-gnu1
[releases.git] / tools / testing / selftests / tc-testing / tdc.py
1 #!/usr/bin/env python3
2 # SPDX-License-Identifier: GPL-2.0
3
4 """
5 tdc.py - Linux tc (Traffic Control) unit test driver
6
7 Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
8 """
9
10 import re
11 import os
12 import sys
13 import argparse
14 import json
15 import subprocess
16 from collections import OrderedDict
17 from string import Template
18
19 from tdc_config import *
20 from tdc_helper import *
21
22
23 USE_NS = True
24
25
26 def replace_keywords(cmd):
27     """
28     For a given executable command, substitute any known
29     variables contained within NAMES with the correct values
30     """
31     tcmd = Template(cmd)
32     subcmd = tcmd.safe_substitute(NAMES)
33     return subcmd
34
35
36 def exec_cmd(command, nsonly=True):
37     """
38     Perform any required modifications on an executable command, then run
39     it in a subprocess and return the results.
40     """
41     if (USE_NS and nsonly):
42         command = 'ip netns exec $NS ' + command
43
44     if '$' in command:
45         command = replace_keywords(command)
46
47     proc = subprocess.Popen(command,
48         shell=True,
49         stdout=subprocess.PIPE,
50         stderr=subprocess.PIPE)
51     (rawout, serr) = proc.communicate()
52
53     if proc.returncode != 0:
54         foutput = serr.decode("utf-8")
55     else:
56         foutput = rawout.decode("utf-8")
57
58     proc.stdout.close()
59     proc.stderr.close()
60     return proc, foutput
61
62
63 def prepare_env(cmdlist):
64     """
65     Execute the setup/teardown commands for a test case. Optionally
66     terminate test execution if the command fails.
67     """
68     for cmdinfo in cmdlist:
69         if (type(cmdinfo) == list):
70             exit_codes = cmdinfo[1:]
71             cmd = cmdinfo[0]
72         else:
73             exit_codes = [0]
74             cmd = cmdinfo
75
76         if (len(cmd) == 0):
77             continue
78
79         (proc, foutput) = exec_cmd(cmd)
80
81         if proc.returncode not in exit_codes:
82             print
83             print("Could not execute:")
84             print(cmd)
85             print("\nError message:")
86             print(foutput)
87             print("\nAborting test run.")
88             ns_destroy()
89             exit(1)
90
91
92 def test_runner(filtered_tests, args):
93     """
94     Driver function for the unit tests.
95
96     Prints information about the tests being run, executes the setup and
97     teardown commands and the command under test itself. Also determines
98     success/failure based on the information in the test case and generates
99     TAP output accordingly.
100     """
101     testlist = filtered_tests
102     tcount = len(testlist)
103     index = 1
104     tap = str(index) + ".." + str(tcount) + "\n"
105
106     for tidx in testlist:
107         result = True
108         tresult = ""
109         if "flower" in tidx["category"] and args.device == None:
110             continue
111         print("Test " + tidx["id"] + ": " + tidx["name"])
112         prepare_env(tidx["setup"])
113         (p, procout) = exec_cmd(tidx["cmdUnderTest"])
114         exit_code = p.returncode
115
116         if (exit_code != int(tidx["expExitCode"])):
117             result = False
118             print("exit:", exit_code, int(tidx["expExitCode"]))
119             print(procout)
120         else:
121             match_pattern = re.compile(str(tidx["matchPattern"]), re.DOTALL)
122             (p, procout) = exec_cmd(tidx["verifyCmd"])
123             match_index = re.findall(match_pattern, procout)
124             if len(match_index) != int(tidx["matchCount"]):
125                 result = False
126
127         if result == True:
128             tresult += "ok "
129         else:
130             tresult += "not ok "
131         tap += tresult + str(index) + " " + tidx["id"] + " " + tidx["name"] + "\n"
132
133         if result == False:
134             tap += procout
135
136         prepare_env(tidx["teardown"])
137         index += 1
138
139     return tap
140
141
142 def ns_create():
143     """
144     Create the network namespace in which the tests will be run and set up
145     the required network devices for it.
146     """
147     if (USE_NS):
148         cmd = 'ip netns add $NS'
149         exec_cmd(cmd, False)
150         cmd = 'ip link add $DEV0 type veth peer name $DEV1'
151         exec_cmd(cmd, False)
152         cmd = 'ip link set $DEV1 netns $NS'
153         exec_cmd(cmd, False)
154         cmd = 'ip link set $DEV0 up'
155         exec_cmd(cmd, False)
156         cmd = 'ip -n $NS link set $DEV1 up'
157         exec_cmd(cmd, False)
158         cmd = 'ip link set $DEV2 netns $NS'
159         exec_cmd(cmd, False)
160         cmd = 'ip -n $NS link set $DEV2 up'
161         exec_cmd(cmd, False)
162
163
164 def ns_destroy():
165     """
166     Destroy the network namespace for testing (and any associated network
167     devices as well)
168     """
169     if (USE_NS):
170         cmd = 'ip netns delete $NS'
171         exec_cmd(cmd, False)
172
173
174 def has_blank_ids(idlist):
175     """
176     Search the list for empty ID fields and return true/false accordingly.
177     """
178     return not(all(k for k in idlist))
179
180
181 def load_from_file(filename):
182     """
183     Open the JSON file containing the test cases and return them as an
184     ordered dictionary object.
185     """
186     with open(filename) as test_data:
187         testlist = json.load(test_data, object_pairs_hook=OrderedDict)
188     idlist = get_id_list(testlist)
189     if (has_blank_ids(idlist)):
190         for k in testlist:
191             k['filename'] = filename
192     return testlist
193
194
195 def args_parse():
196     """
197     Create the argument parser.
198     """
199     parser = argparse.ArgumentParser(description='Linux TC unit tests')
200     return parser
201
202
203 def set_args(parser):
204     """
205     Set the command line arguments for tdc.
206     """
207     parser.add_argument('-p', '--path', type=str,
208                         help='The full path to the tc executable to use')
209     parser.add_argument('-c', '--category', type=str, nargs='?', const='+c',
210                         help='Run tests only from the specified category, or if no category is specified, list known categories.')
211     parser.add_argument('-f', '--file', type=str,
212                         help='Run tests from the specified file')
213     parser.add_argument('-l', '--list', type=str, nargs='?', const="", metavar='CATEGORY',
214                         help='List all test cases, or those only within the specified category')
215     parser.add_argument('-s', '--show', type=str, nargs=1, metavar='ID', dest='showID',
216                         help='Display the test case with specified id')
217     parser.add_argument('-e', '--execute', type=str, nargs=1, metavar='ID',
218                         help='Execute the single test case with specified ID')
219     parser.add_argument('-i', '--id', action='store_true', dest='gen_id',
220                         help='Generate ID numbers for new test cases')
221     parser.add_argument('-d', '--device',
222                         help='Execute the test case in flower category')
223     return parser
224
225
226 def check_default_settings(args):
227     """
228     Process any arguments overriding the default settings, and ensure the
229     settings are correct.
230     """
231     # Allow for overriding specific settings
232     global NAMES
233
234     if args.path != None:
235          NAMES['TC'] = args.path
236     if args.device != None:
237          NAMES['DEV2'] = args.device
238     if not os.path.isfile(NAMES['TC']):
239         print("The specified tc path " + NAMES['TC'] + " does not exist.")
240         exit(1)
241
242
243 def get_id_list(alltests):
244     """
245     Generate a list of all IDs in the test cases.
246     """
247     return [x["id"] for x in alltests]
248
249
250 def check_case_id(alltests):
251     """
252     Check for duplicate test case IDs.
253     """
254     idl = get_id_list(alltests)
255     return [x for x in idl if idl.count(x) > 1]
256
257
258 def does_id_exist(alltests, newid):
259     """
260     Check if a given ID already exists in the list of test cases.
261     """
262     idl = get_id_list(alltests)
263     return (any(newid == x for x in idl))
264
265
266 def generate_case_ids(alltests):
267     """
268     If a test case has a blank ID field, generate a random hex ID for it
269     and then write the test cases back to disk.
270     """
271     import random
272     for c in alltests:
273         if (c["id"] == ""):
274             while True:
275                 newid = str('%04x' % random.randrange(16**4))
276                 if (does_id_exist(alltests, newid)):
277                     continue
278                 else:
279                     c['id'] = newid
280                     break
281
282     ufilename = []
283     for c in alltests:
284         if ('filename' in c):
285             ufilename.append(c['filename'])
286     ufilename = get_unique_item(ufilename)
287     for f in ufilename:
288         testlist = []
289         for t in alltests:
290             if 'filename' in t:
291                 if t['filename'] == f:
292                     del t['filename']
293                     testlist.append(t)
294         outfile = open(f, "w")
295         json.dump(testlist, outfile, indent=4)
296         outfile.close()
297
298
299 def get_test_cases(args):
300     """
301     If a test case file is specified, retrieve tests from that file.
302     Otherwise, glob for all json files in subdirectories and load from
303     each one.
304     """
305     import fnmatch
306     if args.file != None:
307         if not os.path.isfile(args.file):
308             print("The specified test case file " + args.file + " does not exist.")
309             exit(1)
310         flist = [args.file]
311     else:
312         flist = []
313         for root, dirnames, filenames in os.walk('tc-tests'):
314             for filename in fnmatch.filter(filenames, '*.json'):
315                 flist.append(os.path.join(root, filename))
316     alltests = list()
317     for casefile in flist:
318         alltests = alltests + (load_from_file(casefile))
319     return alltests
320
321
322 def set_operation_mode(args):
323     """
324     Load the test case data and process remaining arguments to determine
325     what the script should do for this run, and call the appropriate
326     function.
327     """
328     alltests = get_test_cases(args)
329
330     if args.gen_id:
331         idlist = get_id_list(alltests)
332         if (has_blank_ids(idlist)):
333             alltests = generate_case_ids(alltests)
334         else:
335             print("No empty ID fields found in test files.")
336         exit(0)
337
338     duplicate_ids = check_case_id(alltests)
339     if (len(duplicate_ids) > 0):
340         print("The following test case IDs are not unique:")
341         print(str(set(duplicate_ids)))
342         print("Please correct them before continuing.")
343         exit(1)
344
345     ucat = get_test_categories(alltests)
346
347     if args.showID:
348         show_test_case_by_id(alltests, args.showID[0])
349         exit(0)
350
351     if args.execute:
352         target_id = args.execute[0]
353     else:
354         target_id = ""
355
356     if args.category:
357         if (args.category == '+c'):
358             print("Available categories:")
359             print_sll(ucat)
360             exit(0)
361         else:
362             target_category = args.category
363     else:
364         target_category = ""
365
366
367     testcases = get_categorized_testlist(alltests, ucat)
368
369     if args.list:
370         if (len(args.list) == 0):
371             list_test_cases(alltests)
372             exit(0)
373         elif(len(args.list > 0)):
374             if (args.list not in ucat):
375                 print("Unknown category " + args.list)
376                 print("Available categories:")
377                 print_sll(ucat)
378                 exit(1)
379             list_test_cases(testcases[args.list])
380             exit(0)
381
382     if (os.geteuid() != 0):
383         print("This script must be run with root privileges.\n")
384         exit(1)
385
386     ns_create()
387
388     if (len(target_category) == 0):
389         if (len(target_id) > 0):
390             alltests = list(filter(lambda x: target_id in x['id'], alltests))
391             if (len(alltests) == 0):
392                 print("Cannot find a test case with ID matching " + target_id)
393                 exit(1)
394         catresults = test_runner(alltests, args)
395         print("All test results: " + "\n\n" + catresults)
396     elif (len(target_category) > 0):
397         if (target_category == "flower") and args.device == None:
398             print("Please specify a NIC device (-d) to run category flower")
399             exit(1)
400         if (target_category not in ucat):
401             print("Specified category is not present in this file.")
402             exit(1)
403         else:
404             catresults = test_runner(testcases[target_category], args)
405             print("Category " + target_category + "\n\n" + catresults)
406
407     ns_destroy()
408
409
410 def main():
411     """
412     Start of execution; set up argument parser and get the arguments,
413     and start operations.
414     """
415     parser = args_parse()
416     parser = set_args(parser)
417     (args, remaining) = parser.parse_known_args()
418     check_default_settings(args)
419
420     set_operation_mode(args)
421
422     exit(0)
423
424
425 if __name__ == "__main__":
426     main()