Separate test_runner.py's per-process state

Separates per-process state from the TestRunner's state. This makes it
easier to avoid re-creating objects which should be per-process
singletons, like the SkiaGoldTester.

This change fixes the immediate issue with the SkiaGoldTester, but more
generally, the TestRunner instance shouldn't be pickled to workers; a
future change will fix the remaining issues.

Bug: pdfium:1641
Change-Id: I729a80b05aa7c7d6fa0ad868f4d89e160044342c
Reviewed-on: https://pdfium-review.googlesource.com/c/pdfium/+/99590
Commit-Queue: K. Moon <kmoon@chromium.org>
Reviewed-by: Lei Zhang <thestig@chromium.org>
diff --git a/testing/tools/test_runner.py b/testing/tools/test_runner.py
index b1b1986..08e0783 100644
--- a/testing/tools/test_runner.py
+++ b/testing/tools/test_runner.py
@@ -64,6 +64,7 @@
       os.remove(f)
 
 
+# TODO(crbug.com/pdfium/1641): Don't pickle TestRunner.
 class TestRunner:
 
   def __init__(self, dirname):
@@ -75,23 +76,6 @@
     self.test_type = dirname
     self.delete_output_on_success = False
     self.enforce_expected_images = False
-    self.skia_tester = None
-
-  def GetSkiaGoldTester(self):
-    if not self.skia_tester:
-      self.skia_tester = skia_gold.SkiaGoldTester(
-          source_type=self.test_type,
-          skia_gold_args=self.options,
-          process_name=multiprocessing.current_process().name)
-    return self.skia_tester
-
-  def RunSkia(self, test_case):
-    skia_success = self.GetSkiaGoldTester().UploadTestResultToSkiaGold(
-        test_case.test_id, test_case.input_path)
-    sys.stdout.flush()
-
-    return test_case.NewResult(
-        result_types.PASS if skia_success else result_types.FAIL)
 
   def GenerateAndTest(self, test_case):
     """Generate test input and run pdfium_test."""
@@ -472,7 +456,12 @@
       # Clear out and create top level gold output directory before starting
       skia_gold.clear_gold_output_dir(self.options.gold_output_dir)
 
-    with multiprocessing.Pool(self.options.num_workers) as pool:
+    per_process_state_args = [self.test_type, self.options]
+    per_process_state = _PerProcessState(*per_process_state_args)
+    with multiprocessing.Pool(
+        processes=self.options.num_workers,
+        initializer=_InitializePerProcessState,
+        initargs=per_process_state_args) as pool:
       skia_gold_test_cases = TestCaseManager()
       for result in pool.imap(
           _WrapKeyboardInterrupt(self.GenerateAndTest), self.test_cases):
@@ -486,7 +475,7 @@
             skia_gold_test_cases.NewTestCase(artifact.image_path)
 
       for result in pool.imap(
-          _WrapKeyboardInterrupt(self.RunSkia), skia_gold_test_cases):
+          _WrapKeyboardInterrupt(_RunSkiaTest), skia_gold_test_cases):
         if result.IsPass():
           test_case = skia_gold_test_cases.GetTestCase(result.test_id)
           if self.test_suppressor.IsResultSuppressed(test_case.input_path):
@@ -502,7 +491,7 @@
     # _PrintSummary() is called at the end
     # TODO(crbug.com/pdfium/1657): Once resolved, move _PrintSummary() back
     # down to the end
-    self._PrintSummary()
+    self._PrintSummary(per_process_state)
 
     if self.surprises:
       self.surprises.sort()
@@ -534,7 +523,7 @@
 
     return 0
 
-  def _PrintSummary(self):
+  def _PrintSummary(self, per_process_state):
     number_test_cases = len(self.test_cases)
     number_failures = len(self.failures)
     number_suppressed = len(self.result_suppressed_cases)
@@ -555,7 +544,7 @@
       print('  Skia Gold Successes:', number_gold_successes)
       print('  Skia Gold Surprises:', number_gold_surprises)
       print('  Skia Gold Failures:', number_gold_failures)
-      skia_tester = self.GetSkiaGoldTester()
+      skia_tester = per_process_state.GetSkiaGoldTester()
       if self.skia_gold_failures and skia_tester.IsTryjobRun():
         cl_triage_link = skia_tester.GetCLTriageLink()
         print('  Triage link for CL:', cl_triage_link)
@@ -572,6 +561,53 @@
     self.enforce_expected_images = new_value
 
 
+def _RunSkiaTest(test_case):
+  """Runs a Skia Gold test case."""
+  skia_tester = _per_process_state.GetSkiaGoldTester()
+  skia_success = skia_tester.UploadTestResultToSkiaGold(test_case.test_id,
+                                                        test_case.input_path)
+  sys.stdout.flush()
+
+  return test_case.NewResult(
+      result_types.PASS if skia_success else result_types.FAIL)
+
+
+# `_PerProcessState` singleton. This is initialized when creating the
+# `multiprocessing.Pool()`. `TestRunner.Run()` creates its own separate
+# instance of `_PerProcessState` as well.
+_per_process_state = None
+
+
+def _InitializePerProcessState(test_type, options):
+  """Initializes the `_per_process_state` singleton."""
+  global _per_process_state
+  assert not _per_process_state
+  _per_process_state = _PerProcessState(test_type, options)
+
+
+class _PerProcessState:
+  """State defined per process."""
+
+  def __init__(self, test_type, options):
+    self.test_type = test_type
+    self.options = options
+    self.process_name = multiprocessing.current_process().name
+
+    self.skia_tester = None
+
+  def __getstate__(self):
+    raise RuntimeError('Cannot pickle per-process state')
+
+  def GetSkiaGoldTester(self):
+    """Gets the `SkiaGoldTester` singleton for this worker."""
+    if not self.skia_tester:
+      self.skia_tester = skia_gold.SkiaGoldTester(
+          source_type=self.test_type,
+          skia_gold_args=self.options,
+          process_name=self.process_name)
+    return self.skia_tester
+
+
 @dataclass
 class TestCase:
   """Description of a test case to run.