From 1b81a47b6908df359c4bd1be38ee9e512445d2d9 Mon Sep 17 00:00:00 2001 From: Catherine Date: Wed, 10 Apr 2024 03:28:08 +0000 Subject: [PATCH] test_lib_crc: speed up tests using `multiprocessing`. --- tests/test_lib_crc.py | 186 +++++++++++++++++++++++------------------- 1 file changed, 101 insertions(+), 85 deletions(-) diff --git a/tests/test_lib_crc.py b/tests/test_lib_crc.py index 96b57fb..78cb118 100644 --- a/tests/test_lib_crc.py +++ b/tests/test_lib_crc.py @@ -1,6 +1,7 @@ # amaranth: UnusedElaboratable=no import unittest +import concurrent.futures from amaranth.sim import * from amaranth.lib.crc import Algorithm, Processor, catalog @@ -229,35 +230,41 @@ class CRCTestCase(unittest.TestCase): crc = catalog.CRC8_AUTOSAR() crc.compute([3, 4, 256]) + def for_each_crc_concurrent(self, f): + with concurrent.futures.ProcessPoolExecutor() as executor: + futures = {executor.submit(f, crc) for crc in CRCS} + for future in concurrent.futures.as_completed(futures): + future.result() + + @staticmethod + def perform_test_crc_bytes(name): + crc = getattr(catalog, name)(data_width=8).create() + check = CRC_CHECKS[name][0] + + def process(): + for word in b"123456789": + yield crc.start.eq(word == b"1") + yield crc.data.eq(word) + yield crc.valid.eq(1) + yield Tick() + yield crc.valid.eq(0) + yield Tick() + assert (yield crc.crc) == check + + sim = Simulator(crc) + sim.add_testbench(process) + sim.add_clock(1e-6) + sim.run() + def test_crc_bytes(self): """ Verify CRC generation by computing the check value for each CRC in the catalogue with byte-sized inputs. """ - for name in CRCS: - crc = getattr(catalog, name)(data_width=8).create() - check = CRC_CHECKS[name][0] + self.for_each_crc_concurrent(self.perform_test_crc_bytes) - def process(): - for word in b"123456789": - yield crc.start.eq(word == b"1") - yield crc.data.eq(word) - yield crc.valid.eq(1) - yield Tick() - yield crc.valid.eq(0) - yield Tick() - self.assertEqual((yield crc.crc), check) - - sim = Simulator(crc) - sim.add_testbench(process) - sim.add_clock(1e-6) - sim.run() - - def test_crc_words(self): - """ - Verify CRC generation for non-byte-sized data by computing a check - value for 1, 2, 4, 16, 32, and 64-bit inputs. - """ + @staticmethod + def perform_test_crc_words(name): # We can't use the catalogue check value since it requires 8-bit # inputs, so we'll instead use an input of b"12345678". data = b"12345678" @@ -267,70 +274,18 @@ class CRCTestCase(unittest.TestCase): bits = "".join(f"{x:08b}" for x in data) bits_r = "".join(f"{x:08b}"[::-1] for x in data) - for name in CRCS: - for m in (1, 2, 4, 16, 32, 64): - algo = getattr(catalog, name) - crc = algo(data_width=m).create() - # Use a SoftwareCRC with byte inputs to compute new checks. - swcrc = algo(data_width=8) - check = swcrc.compute(data) - # Chunk input bits into m-bit words, reflecting if needed. - if algo.reflect_input: - d = [bits_r[i : i+m][::-1] for i in range(0, len(bits), m)] - else: - d = [bits[i : i+m] for i in range(0, len(bits), m)] - words = [int(x, 2) for x in d] - - def process(): - yield crc.start.eq(1) - yield Tick() - yield crc.start.eq(0) - for word in words: - yield crc.data.eq(word) - yield crc.valid.eq(1) - yield Tick() - yield crc.valid.eq(0) - yield Tick() - self.assertEqual((yield crc.crc), check) - - sim = Simulator(crc) - sim.add_testbench(process) - sim.add_clock(1e-6) - sim.run() - - def test_crc_match(self): - """Verify match_detected output detects valid codewords.""" - for name in CRCS: + for m in (1, 2, 4, 16, 32, 64): algo = getattr(catalog, name) - n = algo.crc_width - m = 8 if n % 8 == 0 else 1 crc = algo(data_width=m).create() - check = CRC_CHECKS[name][0] - - if m == 8: - # For CRCs which are multiples of one byte wide, we can easily - # append the correct checksum in bytes. - check_b = check.to_bytes(n // 8, "little" if algo.reflect_output else "big") - words = b"123456789" + check_b + # Use a SoftwareCRC with byte inputs to compute new checks. + swcrc = algo(data_width=8) + check = swcrc.compute(data) + # Chunk input bits into m-bit words, reflecting if needed. + if algo.reflect_input: + d = [bits_r[i : i+m][::-1] for i in range(0, len(bits), m)] else: - # For other CRC sizes, use single-bit input data. - if algo.reflect_output: - check_b = check.to_bytes((n + 7)//8, "little") - if not algo.reflect_input: - # For cross-endian CRCs, flip the CRC bits separately. - check_b = bytearray(int(f"{x:08b}"[::-1], 2) for x in check_b) - else: - shift = 8 - (n % 8) - check_b = (check << shift).to_bytes((n + 7)//8, "big") - # No catalogue CRCs have ref_in but not ref_out. - codeword = b"123456789" + check_b - words = [] - for byte in codeword: - if algo.reflect_input: - words += [int(x) for x in f"{byte:08b}"[::-1]] - else: - words += [int(x) for x in f"{byte:08b}"] - words = words[:72 + n] + d = [bits[i : i+m] for i in range(0, len(bits), m)] + words = [int(x, 2) for x in d] def process(): yield crc.start.eq(1) @@ -342,9 +297,70 @@ class CRCTestCase(unittest.TestCase): yield Tick() yield crc.valid.eq(0) yield Tick() - self.assertTrue((yield crc.match_detected)) + assert (yield crc.crc) == check sim = Simulator(crc) sim.add_testbench(process) sim.add_clock(1e-6) sim.run() + + def test_crc_words(self): + """ + Verify CRC generation for non-byte-sized data by computing a check + value for 1, 2, 4, 16, 32, and 64-bit inputs. + """ + self.for_each_crc_concurrent(self.perform_test_crc_words) + + @staticmethod + def perform_test_crc_match(name): + algo = getattr(catalog, name) + n = algo.crc_width + m = 8 if n % 8 == 0 else 1 + crc = algo(data_width=m).create() + check = CRC_CHECKS[name][0] + + if m == 8: + # For CRCs which are multiples of one byte wide, we can easily + # append the correct checksum in bytes. + check_b = check.to_bytes(n // 8, "little" if algo.reflect_output else "big") + words = b"123456789" + check_b + else: + # For other CRC sizes, use single-bit input data. + if algo.reflect_output: + check_b = check.to_bytes((n + 7)//8, "little") + if not algo.reflect_input: + # For cross-endian CRCs, flip the CRC bits separately. + check_b = bytearray(int(f"{x:08b}"[::-1], 2) for x in check_b) + else: + shift = 8 - (n % 8) + check_b = (check << shift).to_bytes((n + 7)//8, "big") + # No catalogue CRCs have ref_in but not ref_out. + codeword = b"123456789" + check_b + words = [] + for byte in codeword: + if algo.reflect_input: + words += [int(x) for x in f"{byte:08b}"[::-1]] + else: + words += [int(x) for x in f"{byte:08b}"] + words = words[:72 + n] + + def process(): + yield crc.start.eq(1) + yield Tick() + yield crc.start.eq(0) + for word in words: + yield crc.data.eq(word) + yield crc.valid.eq(1) + yield Tick() + yield crc.valid.eq(0) + yield Tick() + assert (yield crc.match_detected) + + sim = Simulator(crc) + sim.add_testbench(process) + sim.add_clock(1e-6) + sim.run() + + def test_crc_match(self): + """Verify match_detected output detects valid codewords.""" + self.for_each_crc_concurrent(self.perform_test_crc_match)