test_lib_crc: speed up tests using multiprocessing.

This commit is contained in:
Catherine 2024-04-10 03:28:08 +00:00
parent b6f51d269e
commit 1b81a47b69

View file

@ -1,6 +1,7 @@
# amaranth: UnusedElaboratable=no
import unittest
import concurrent.futures
from amaranth.sim import *
from amaranth.lib.crc import Algorithm, Processor, catalog
@ -229,35 +230,41 @@ class CRCTestCase(unittest.TestCase):
crc = catalog.CRC8_AUTOSAR()
crc.compute([3, 4, 256])
def for_each_crc_concurrent(self, f):
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = {executor.submit(f, crc) for crc in CRCS}
for future in concurrent.futures.as_completed(futures):
future.result()
@staticmethod
def perform_test_crc_bytes(name):
crc = getattr(catalog, name)(data_width=8).create()
check = CRC_CHECKS[name][0]
def process():
for word in b"123456789":
yield crc.start.eq(word == b"1")
yield crc.data.eq(word)
yield crc.valid.eq(1)
yield Tick()
yield crc.valid.eq(0)
yield Tick()
assert (yield crc.crc) == check
sim = Simulator(crc)
sim.add_testbench(process)
sim.add_clock(1e-6)
sim.run()
def test_crc_bytes(self):
"""
Verify CRC generation by computing the check value for each CRC
in the catalogue with byte-sized inputs.
"""
for name in CRCS:
crc = getattr(catalog, name)(data_width=8).create()
check = CRC_CHECKS[name][0]
self.for_each_crc_concurrent(self.perform_test_crc_bytes)
def process():
for word in b"123456789":
yield crc.start.eq(word == b"1")
yield crc.data.eq(word)
yield crc.valid.eq(1)
yield Tick()
yield crc.valid.eq(0)
yield Tick()
self.assertEqual((yield crc.crc), check)
sim = Simulator(crc)
sim.add_testbench(process)
sim.add_clock(1e-6)
sim.run()
def test_crc_words(self):
"""
Verify CRC generation for non-byte-sized data by computing a check
value for 1, 2, 4, 16, 32, and 64-bit inputs.
"""
@staticmethod
def perform_test_crc_words(name):
# We can't use the catalogue check value since it requires 8-bit
# inputs, so we'll instead use an input of b"12345678".
data = b"12345678"
@ -267,70 +274,18 @@ class CRCTestCase(unittest.TestCase):
bits = "".join(f"{x:08b}" for x in data)
bits_r = "".join(f"{x:08b}"[::-1] for x in data)
for name in CRCS:
for m in (1, 2, 4, 16, 32, 64):
algo = getattr(catalog, name)
crc = algo(data_width=m).create()
# Use a SoftwareCRC with byte inputs to compute new checks.
swcrc = algo(data_width=8)
check = swcrc.compute(data)
# Chunk input bits into m-bit words, reflecting if needed.
if algo.reflect_input:
d = [bits_r[i : i+m][::-1] for i in range(0, len(bits), m)]
else:
d = [bits[i : i+m] for i in range(0, len(bits), m)]
words = [int(x, 2) for x in d]
def process():
yield crc.start.eq(1)
yield Tick()
yield crc.start.eq(0)
for word in words:
yield crc.data.eq(word)
yield crc.valid.eq(1)
yield Tick()
yield crc.valid.eq(0)
yield Tick()
self.assertEqual((yield crc.crc), check)
sim = Simulator(crc)
sim.add_testbench(process)
sim.add_clock(1e-6)
sim.run()
def test_crc_match(self):
"""Verify match_detected output detects valid codewords."""
for name in CRCS:
for m in (1, 2, 4, 16, 32, 64):
algo = getattr(catalog, name)
n = algo.crc_width
m = 8 if n % 8 == 0 else 1
crc = algo(data_width=m).create()
check = CRC_CHECKS[name][0]
if m == 8:
# For CRCs which are multiples of one byte wide, we can easily
# append the correct checksum in bytes.
check_b = check.to_bytes(n // 8, "little" if algo.reflect_output else "big")
words = b"123456789" + check_b
# Use a SoftwareCRC with byte inputs to compute new checks.
swcrc = algo(data_width=8)
check = swcrc.compute(data)
# Chunk input bits into m-bit words, reflecting if needed.
if algo.reflect_input:
d = [bits_r[i : i+m][::-1] for i in range(0, len(bits), m)]
else:
# For other CRC sizes, use single-bit input data.
if algo.reflect_output:
check_b = check.to_bytes((n + 7)//8, "little")
if not algo.reflect_input:
# For cross-endian CRCs, flip the CRC bits separately.
check_b = bytearray(int(f"{x:08b}"[::-1], 2) for x in check_b)
else:
shift = 8 - (n % 8)
check_b = (check << shift).to_bytes((n + 7)//8, "big")
# No catalogue CRCs have ref_in but not ref_out.
codeword = b"123456789" + check_b
words = []
for byte in codeword:
if algo.reflect_input:
words += [int(x) for x in f"{byte:08b}"[::-1]]
else:
words += [int(x) for x in f"{byte:08b}"]
words = words[:72 + n]
d = [bits[i : i+m] for i in range(0, len(bits), m)]
words = [int(x, 2) for x in d]
def process():
yield crc.start.eq(1)
@ -342,9 +297,70 @@ class CRCTestCase(unittest.TestCase):
yield Tick()
yield crc.valid.eq(0)
yield Tick()
self.assertTrue((yield crc.match_detected))
assert (yield crc.crc) == check
sim = Simulator(crc)
sim.add_testbench(process)
sim.add_clock(1e-6)
sim.run()
def test_crc_words(self):
"""
Verify CRC generation for non-byte-sized data by computing a check
value for 1, 2, 4, 16, 32, and 64-bit inputs.
"""
self.for_each_crc_concurrent(self.perform_test_crc_words)
@staticmethod
def perform_test_crc_match(name):
algo = getattr(catalog, name)
n = algo.crc_width
m = 8 if n % 8 == 0 else 1
crc = algo(data_width=m).create()
check = CRC_CHECKS[name][0]
if m == 8:
# For CRCs which are multiples of one byte wide, we can easily
# append the correct checksum in bytes.
check_b = check.to_bytes(n // 8, "little" if algo.reflect_output else "big")
words = b"123456789" + check_b
else:
# For other CRC sizes, use single-bit input data.
if algo.reflect_output:
check_b = check.to_bytes((n + 7)//8, "little")
if not algo.reflect_input:
# For cross-endian CRCs, flip the CRC bits separately.
check_b = bytearray(int(f"{x:08b}"[::-1], 2) for x in check_b)
else:
shift = 8 - (n % 8)
check_b = (check << shift).to_bytes((n + 7)//8, "big")
# No catalogue CRCs have ref_in but not ref_out.
codeword = b"123456789" + check_b
words = []
for byte in codeword:
if algo.reflect_input:
words += [int(x) for x in f"{byte:08b}"[::-1]]
else:
words += [int(x) for x in f"{byte:08b}"]
words = words[:72 + n]
def process():
yield crc.start.eq(1)
yield Tick()
yield crc.start.eq(0)
for word in words:
yield crc.data.eq(word)
yield crc.valid.eq(1)
yield Tick()
yield crc.valid.eq(0)
yield Tick()
assert (yield crc.match_detected)
sim = Simulator(crc)
sim.add_testbench(process)
sim.add_clock(1e-6)
sim.run()
def test_crc_match(self):
"""Verify match_detected output detects valid codewords."""
self.for_each_crc_concurrent(self.perform_test_crc_match)