Skip to main content

foundry_evm/executors/invariant/
campaign.rs

1use super::{
2    FailureKey, InvariantFailureMetrics, InvariantFailures, InvariantFuzzError,
3    InvariantFuzzTestResult, InvariantMetrics,
4};
5use crate::executors::{EarlyExit, corpus::CampaignCorpusEntry};
6use alloy_primitives::{Address, I256, Selector};
7use eyre::{Result, ensure};
8use foundry_evm_coverage::HitMaps;
9use foundry_evm_fuzz::BasicTxDetails;
10use std::{
11    collections::{HashMap, HashSet},
12    sync::{
13        Mutex,
14        atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
15    },
16    time::{Duration, Instant},
17};
18
19/// Immutable plan-level description for an invariant campaign.
20///
21/// This is only a planning contract for splitting one logical campaign into worker ranges. It does
22/// not start workers, choose worker counts, or decide corpus/failure persistence.
23#[derive(Clone, Copy, Debug, PartialEq, Eq)]
24pub struct InvariantCampaignSpec {
25    /// Total logical runs configured for the campaign.
26    pub total_runs: u32,
27}
28
29impl InvariantCampaignSpec {
30    pub const fn new(total_runs: u32) -> Self {
31        Self { total_runs }
32    }
33
34    /// Partitions the logical campaign into contiguous worker run ranges.
35    ///
36    /// This only describes work assignment. It does not start worker execution and does not
37    /// attribute failures to worker/run origins.
38    pub fn worker_plans(self, workers: usize) -> Result<Vec<InvariantWorkerPlan>> {
39        ensure!(workers > 0, "invariant campaign requires at least one worker");
40
41        if self.total_runs == 0 {
42            return Ok(vec![InvariantWorkerPlan { worker_id: 0, first_global_run: 0, runs: 0 }]);
43        }
44
45        let worker_count = workers.min(self.total_runs as usize) as u32;
46        let base_runs = self.total_runs / worker_count;
47        let extra_runs = self.total_runs % worker_count;
48
49        let mut first_global_run = 0;
50        let mut plans = Vec::with_capacity(worker_count as usize);
51        for worker_id in 0..worker_count {
52            let runs = base_runs + u32::from(worker_id < extra_runs);
53            plans.push(InvariantWorkerPlan { worker_id, first_global_run, runs });
54            first_global_run += runs;
55        }
56
57        debug_assert_eq!(first_global_run, self.total_runs);
58        Ok(plans)
59    }
60}
61
62/// Static assignment of a contiguous logical run range to one worker.
63///
64/// The assigned range is `[first_global_run, first_global_run + runs)`.
65/// Worker `0` is the master worker for master-only artifacts such as persisted corpus replay
66/// counts.
67#[derive(Clone, Copy, Debug, PartialEq, Eq)]
68pub struct InvariantWorkerPlan {
69    pub worker_id: u32,
70    pub first_global_run: u32,
71    pub runs: u32,
72}
73
74/// Shared state used only to coordinate invariant worker execution.
75pub struct InvariantCampaignState {
76    started_at: Instant,
77    timeout: Option<Duration>,
78    total_runs: AtomicU32,
79    total_txs: AtomicU64,
80    total_gas: AtomicU64,
81    terminal_stop: AtomicBool,
82    global_early_exit: EarlyExit,
83    last_metrics_report: Mutex<Instant>,
84    failure_metrics: Mutex<CampaignFailureMetrics>,
85}
86
87#[derive(Default)]
88struct CampaignFailureMetrics {
89    metrics: InvariantFailureMetrics,
90    handler_sites: HashSet<(Address, Selector)>,
91}
92
93impl InvariantCampaignState {
94    pub fn new(early_exit: EarlyExit, timeout: Option<u32>) -> Self {
95        let started_at = Instant::now();
96        Self {
97            started_at,
98            timeout: timeout.map(|timeout| Duration::from_secs(timeout.into())),
99            total_runs: AtomicU32::new(0),
100            total_txs: AtomicU64::new(0),
101            total_gas: AtomicU64::new(0),
102            terminal_stop: AtomicBool::new(false),
103            global_early_exit: early_exit,
104            last_metrics_report: Mutex::new(started_at),
105            failure_metrics: Mutex::new(CampaignFailureMetrics::default()),
106        }
107    }
108
109    pub fn increment_runs(&self) -> u32 {
110        self.total_runs.fetch_add(1, Ordering::Relaxed) + 1
111    }
112
113    #[cfg(test)]
114    pub fn total_runs(&self) -> u32 {
115        self.total_runs.load(Ordering::Relaxed)
116    }
117
118    pub fn record_call(&self, gas_used: u64) {
119        self.total_txs.fetch_add(1, Ordering::Relaxed);
120        self.total_gas.fetch_add(gas_used, Ordering::Relaxed);
121    }
122
123    pub fn throughput_totals(&self) -> (u64, u64) {
124        (self.total_txs.load(Ordering::Relaxed), self.total_gas.load(Ordering::Relaxed))
125    }
126
127    pub fn elapsed(&self) -> Duration {
128        self.started_at.elapsed()
129    }
130
131    pub const fn is_timed_campaign(&self) -> bool {
132        self.timeout.is_some()
133    }
134
135    pub fn is_timed_out(&self) -> bool {
136        self.timeout.is_some_and(|duration| self.elapsed() > duration)
137    }
138
139    pub fn should_stop(&self) -> bool {
140        self.global_early_exit.should_stop()
141            || self.terminal_stop.load(Ordering::Relaxed)
142            || self.is_timed_out()
143    }
144
145    pub fn request_terminal_stop(&self) {
146        self.terminal_stop.store(true, Ordering::Relaxed);
147    }
148
149    pub fn should_emit_metrics_report(&self, interval: Duration) -> bool {
150        let mut last_report =
151            self.last_metrics_report.lock().expect("metrics report lock poisoned");
152        if last_report.elapsed() <= interval {
153            return false;
154        }
155
156        *last_report = Instant::now();
157        true
158    }
159
160    pub(super) fn record_invariant_failure(
161        &self,
162        invariant_name: &str,
163        target: &str,
164        reason: &str,
165    ) {
166        let mut failure_metrics =
167            self.failure_metrics.lock().expect("failure metrics lock poisoned");
168        if !failure_metrics.metrics.unique_failures.contains(invariant_name) {
169            failure_metrics.metrics.record_failure(invariant_name, target, reason);
170        }
171    }
172
173    pub(super) fn sync_handler_failures(&self, failures: &InvariantFailures) {
174        let mut failure_metrics =
175            self.failure_metrics.lock().expect("failure metrics lock poisoned");
176        for key in failures.failures.keys() {
177            let FailureKey::Handler(addr, selector) = key else { continue };
178            failure_metrics.handler_sites.insert((*addr, *selector));
179        }
180        failure_metrics.metrics.broken_handlers = failure_metrics.handler_sites.len();
181    }
182
183    pub(super) fn failure_metrics(&self) -> InvariantFailureMetrics {
184        self.failure_metrics.lock().expect("failure metrics lock poisoned").metrics.clone()
185    }
186
187    pub const fn early_exit(&self) -> &EarlyExit {
188        &self.global_early_exit
189    }
190}
191
192/// Output produced by one invariant worker.
193///
194/// This is a data envelope for aggregation only. It does not imply that this module executed the
195/// worker, shrank failures, or wrote any persisted corpus/failure files.
196#[derive(Debug)]
197pub struct InvariantWorkerOutput {
198    pub plan: InvariantWorkerPlan,
199    pub result: InvariantFuzzTestResult,
200    pub corpus_entries: Vec<CampaignCorpusEntry>,
201}
202
203impl InvariantWorkerOutput {
204    #[cfg(test)]
205    pub const fn new(plan: InvariantWorkerPlan, result: InvariantFuzzTestResult) -> Self {
206        Self { plan, result, corpus_entries: Vec::new() }
207    }
208}
209
210/// Merges worker outputs back into one logical invariant campaign result.
211///
212/// Merge policy:
213/// - outputs are folded in `first_global_run` order;
214/// - predicate failures keep the first failure in logical run order;
215/// - handler assertion failures keep the shorter reproducer, with equal lengths preserving the
216///   earlier logical worker;
217/// - optimization mode keeps the maximum value, with ties preserving the earlier logical worker;
218/// - `failed_corpus_replays` is a master-worker-only value from worker `0`;
219/// - run/call counts, reverts, gas traces, selector metrics, and line coverage accumulate into the
220///   logical campaign result.
221#[derive(Debug)]
222pub struct InvariantCampaignAggregator {
223    spec: InvariantCampaignSpec,
224    outputs: Vec<InvariantWorkerOutput>,
225}
226
227impl InvariantCampaignAggregator {
228    pub const fn new(spec: InvariantCampaignSpec) -> Self {
229        Self { spec, outputs: Vec::new() }
230    }
231
232    pub fn push(&mut self, output: InvariantWorkerOutput) {
233        self.outputs.push(output);
234    }
235
236    /// Validates the collected worker ranges and folds them into one logical campaign result.
237    #[cfg(test)]
238    pub fn finish(self) -> Result<InvariantFuzzTestResult> {
239        Ok(self.finish_with_corpus_entries()?.0)
240    }
241
242    /// Validates the collected worker ranges and folds them into one logical campaign result with
243    /// corpus artifacts selected in logical worker order.
244    pub fn finish_with_corpus_entries(
245        mut self,
246    ) -> Result<(InvariantFuzzTestResult, Vec<CampaignCorpusEntry>)> {
247        ensure!(!self.outputs.is_empty(), "missing invariant worker output");
248
249        self.outputs.sort_by_key(|output| output.plan.first_global_run);
250        ensure_outputs_cover_campaign(self.spec, &self.outputs)?;
251        fold_outputs(self.outputs)
252    }
253
254    /// Folds timeout worker outputs without requiring full logical campaign coverage.
255    ///
256    /// Timeout campaigns share a wall-clock deadline across workers. When the deadline hits, any
257    /// worker may have completed fewer than its assigned runs, so the original static ranges can
258    /// contain gaps. The merge still validates worker identity and preserves deterministic worker
259    /// order, but final run count is derived from the completed worker counters.
260    pub fn finish_partial_with_corpus_entries(
261        mut self,
262    ) -> Result<(InvariantFuzzTestResult, Vec<CampaignCorpusEntry>)> {
263        ensure!(!self.outputs.is_empty(), "missing invariant worker output");
264
265        self.outputs.sort_by_key(|output| output.plan.first_global_run);
266        ensure_worker_ids_are_valid(&self.outputs)?;
267        fold_outputs(self.outputs)
268    }
269}
270
271fn fold_outputs(
272    outputs: Vec<InvariantWorkerOutput>,
273) -> Result<(InvariantFuzzTestResult, Vec<CampaignCorpusEntry>)> {
274    let workers = outputs.len();
275    let mut errors = HashMap::default();
276    let mut handler_errors = HashMap::default();
277    let mut runs = 0;
278    let mut calls = 0;
279    let mut reverts = 0;
280    let mut last_run_inputs = Vec::new();
281    let mut gas_report_traces = Vec::new();
282    let mut line_coverage = None;
283    let mut metrics = HashMap::default();
284    let mut corpus_entries = Vec::new();
285    let mut failed_corpus_replays = 0;
286    let mut optimization_best = None;
287
288    for InvariantWorkerOutput { plan, result, corpus_entries: worker_entries } in outputs {
289        if plan.worker_id == 0 {
290            failed_corpus_replays = result.failed_corpus_replays;
291        }
292        for (invariant, error) in result.errors {
293            errors.entry(invariant).or_insert(error);
294        }
295        merge_handler_errors(&mut handler_errors, result.handler_errors);
296        corpus_entries.extend(worker_entries);
297        runs += result.runs;
298        calls += result.calls;
299        reverts += result.reverts;
300        if !result.last_run_inputs.is_empty() {
301            last_run_inputs = result.last_run_inputs;
302        }
303        gas_report_traces.extend(result.gas_report_traces);
304        HitMaps::merge_opt(&mut line_coverage, result.line_coverage);
305        merge_metrics(&mut metrics, result.metrics);
306        merge_optimization(
307            &mut optimization_best,
308            result.optimization_best_value,
309            result.optimization_best_sequence,
310        );
311    }
312    let (optimization_best_value, optimization_best_sequence) =
313        optimization_best.map(|(value, sequence)| (Some(value), sequence)).unwrap_or_default();
314    Ok((
315        InvariantFuzzTestResult::new(
316            errors,
317            handler_errors,
318            runs,
319            calls,
320            reverts,
321            last_run_inputs,
322            gas_report_traces,
323            line_coverage,
324            metrics,
325            failed_corpus_replays,
326            workers,
327            optimization_best_value,
328            optimization_best_sequence,
329        ),
330        corpus_entries,
331    ))
332}
333
334fn ensure_outputs_cover_campaign(
335    spec: InvariantCampaignSpec,
336    outputs: &[InvariantWorkerOutput],
337) -> Result<()> {
338    ensure_worker_ids_are_valid(outputs)?;
339
340    if spec.total_runs == 0 {
341        ensure!(
342            outputs.len() == 1
343                && outputs[0].plan.first_global_run == 0
344                && outputs[0].plan.runs == 0,
345            "invariant worker outputs do not cover the logical campaign"
346        );
347        return Ok(());
348    }
349
350    let mut next_global_run = 0;
351    for output in outputs {
352        ensure!(output.plan.runs > 0, "invariant worker outputs do not cover the logical campaign");
353        ensure!(
354            output.plan.first_global_run == next_global_run,
355            "invariant worker outputs do not cover the logical campaign"
356        );
357        next_global_run = next_global_run
358            .checked_add(output.plan.runs)
359            .ok_or_else(|| eyre::eyre!("invariant worker output range overflows"))?;
360    }
361
362    ensure!(
363        next_global_run == spec.total_runs,
364        "invariant worker outputs do not cover the logical campaign"
365    );
366    Ok(())
367}
368
369fn ensure_worker_ids_are_valid(outputs: &[InvariantWorkerOutput]) -> Result<()> {
370    let mut seen = HashSet::with_capacity(outputs.len());
371    for output in outputs {
372        ensure!(
373            seen.insert(output.plan.worker_id),
374            "duplicate invariant worker output for worker {}",
375            output.plan.worker_id
376        );
377    }
378
379    ensure!(seen.contains(&0), "missing invariant master worker output");
380    Ok(())
381}
382
383/// Deduplicates handler assertion failures by site, keeping the shorter reproducer.
384/// Equal-length reproducers keep the one already inserted, which is the earlier logical worker
385/// because the caller folds worker outputs in `first_global_run` order.
386fn merge_handler_errors(
387    merged: &mut HashMap<(Address, Selector), InvariantFuzzError>,
388    worker_errors: HashMap<(Address, Selector), InvariantFuzzError>,
389) {
390    for (site, error) in worker_errors {
391        let candidate_len = handler_error_sequence_len(&error);
392        if merged
393            .get(&site)
394            .is_none_or(|existing| handler_error_sequence_len(existing) > candidate_len)
395        {
396            merged.insert(site, error);
397        }
398    }
399}
400
401/// Adds worker-local selector metrics into the logical campaign totals.
402fn merge_metrics(
403    merged: &mut HashMap<String, InvariantMetrics>,
404    worker_metrics: HashMap<String, InvariantMetrics>,
405) {
406    for (selector, metrics) in worker_metrics {
407        let entry = merged.entry(selector).or_default();
408        entry.calls += metrics.calls;
409        entry.reverts += metrics.reverts;
410        entry.discards += metrics.discards;
411    }
412}
413
414/// Keeps the best optimization value, using logical run order to break ties.
415fn merge_optimization(
416    best: &mut Option<(I256, Vec<BasicTxDetails>)>,
417    candidate_value: Option<I256>,
418    candidate_sequence: Vec<BasicTxDetails>,
419) {
420    let Some(candidate_value) = candidate_value else {
421        return;
422    };
423
424    if best.as_ref().is_none_or(|(best, _)| candidate_value > *best) {
425        *best = Some((candidate_value, candidate_sequence));
426    }
427}
428
429fn handler_error_sequence_len(error: &InvariantFuzzError) -> usize {
430    error.as_handler_assertion().map_or(usize::MAX, |failure| failure.call_sequence.len())
431}
432
433#[cfg(test)]
434mod tests {
435    use super::{
436        super::error::{FailedInvariantCaseData, HandlerAssertionFailure},
437        *,
438    };
439    use alloy_primitives::{B256, Bytes};
440    use foundry_evm_coverage::HitMap;
441    use foundry_evm_fuzz::CallDetails;
442    use proptest::test_runner::TestError;
443    use revm_inspectors::tracing::CallTraceArena;
444
445    fn empty_result(reverts: usize, failed_corpus_replays: usize) -> InvariantFuzzTestResult {
446        InvariantFuzzTestResult::new(
447            HashMap::default(),
448            HashMap::default(),
449            0,
450            0,
451            reverts,
452            Vec::new(),
453            Vec::new(),
454            None,
455            HashMap::default(),
456            failed_corpus_replays,
457            1,
458            None,
459            Vec::new(),
460        )
461    }
462
463    fn basic_tx(sender: u8) -> BasicTxDetails {
464        BasicTxDetails {
465            warp: None,
466            roll: None,
467            sender: Address::repeat_byte(sender),
468            call_details: CallDetails {
469                target: Address::repeat_byte(sender.wrapping_add(1)),
470                calldata: Bytes::from(vec![0, 0, 0, sender]),
471                value: None,
472            },
473        }
474    }
475
476    fn hit_maps(pc: u32, hits: u32) -> HitMaps {
477        let mut hit_map = HitMap::new(Bytes::from_static(&[0]));
478        hit_map.hits(pc, hits);
479
480        let mut maps = HitMaps::default();
481        maps.insert(B256::ZERO, hit_map);
482        maps
483    }
484
485    /// Builds a worker-local result fixture with the fields merged by the aggregator.
486    fn worker_result(
487        reverts: usize,
488        last_input_sender: u8,
489        metric_name: &str,
490        metrics: InvariantMetrics,
491        coverage_hits: u32,
492        failed_corpus_replays: usize,
493    ) -> InvariantFuzzTestResult {
494        let mut result = empty_result(reverts, failed_corpus_replays);
495        result.runs = 1;
496        result.calls = metrics.calls;
497        result.last_run_inputs = vec![basic_tx(last_input_sender)];
498        result.gas_report_traces.push(vec![CallTraceArena::default()]);
499        result.line_coverage = Some(hit_maps(7, coverage_hits));
500        result.metrics.insert(metric_name.to_string(), metrics);
501        result
502    }
503
504    fn sequence(len: usize, first_sender: u8) -> Vec<BasicTxDetails> {
505        (0..len).map(|idx| basic_tx(first_sender.wrapping_add(idx as u8))).collect()
506    }
507
508    /// Builds a predicate failure fixture with a reproducible call sequence.
509    fn predicate_error(reason: &str, sequence_len: usize) -> InvariantFuzzError {
510        InvariantFuzzError::BrokenInvariant(FailedInvariantCaseData {
511            test_error: TestError::Fail(reason.to_string().into(), sequence(sequence_len, 0x80)),
512            return_reason: reason.to_string().into(),
513            revert_reason: reason.to_string(),
514            addr: Address::repeat_byte(0x70),
515            calldata: Bytes::new(),
516            inner_sequence: Vec::new(),
517            shrink_run_limit: 0,
518            fail_on_revert: false,
519            assertion_failure: false,
520        })
521    }
522
523    /// Builds a handler assertion fixture with a reproducible call sequence.
524    fn handler_error(
525        reverter: Address,
526        selector: Selector,
527        sequence_len: usize,
528        reason: &str,
529    ) -> InvariantFuzzError {
530        InvariantFuzzError::HandlerAssertion(HandlerAssertionFailure {
531            reverter,
532            selector,
533            call_sequence: sequence(sequence_len, 0x90),
534            original_sequence_len: sequence_len,
535            revert_reason: reason.to_string(),
536            edge_fingerprint: B256::ZERO,
537        })
538    }
539
540    fn one_worker_plan(total_runs: u32) -> InvariantWorkerPlan {
541        let mut plans = InvariantCampaignSpec::new(total_runs).worker_plans(1).unwrap();
542        assert_eq!(plans.len(), 1);
543        plans.pop().unwrap()
544    }
545
546    #[test]
547    fn worker_plans_cover_logical_campaign_with_one_worker() {
548        let plan = one_worker_plan(3);
549
550        assert_eq!(plan.worker_id, 0);
551        assert_eq!(plan.first_global_run, 0);
552        assert_eq!(plan.runs, 3);
553    }
554
555    #[test]
556    fn worker_plans_split_runs_evenly() {
557        let plans = InvariantCampaignSpec::new(100).worker_plans(4).unwrap();
558
559        assert_eq!(
560            plans,
561            vec![
562                InvariantWorkerPlan { worker_id: 0, first_global_run: 0, runs: 25 },
563                InvariantWorkerPlan { worker_id: 1, first_global_run: 25, runs: 25 },
564                InvariantWorkerPlan { worker_id: 2, first_global_run: 50, runs: 25 },
565                InvariantWorkerPlan { worker_id: 3, first_global_run: 75, runs: 25 },
566            ]
567        );
568    }
569
570    #[test]
571    fn worker_plans_distribute_remainder_to_earlier_workers() {
572        let plans = InvariantCampaignSpec::new(10).worker_plans(3).unwrap();
573
574        assert_eq!(
575            plans,
576            vec![
577                InvariantWorkerPlan { worker_id: 0, first_global_run: 0, runs: 4 },
578                InvariantWorkerPlan { worker_id: 1, first_global_run: 4, runs: 3 },
579                InvariantWorkerPlan { worker_id: 2, first_global_run: 7, runs: 3 },
580            ]
581        );
582    }
583
584    #[test]
585    fn worker_plans_do_not_create_empty_workers_when_runs_are_available() {
586        let plans = InvariantCampaignSpec::new(2).worker_plans(8).unwrap();
587
588        assert_eq!(
589            plans,
590            vec![
591                InvariantWorkerPlan { worker_id: 0, first_global_run: 0, runs: 1 },
592                InvariantWorkerPlan { worker_id: 1, first_global_run: 1, runs: 1 },
593            ]
594        );
595    }
596
597    #[test]
598    fn worker_plans_keep_zero_run_campaign_as_single_empty_plan() {
599        let plans = InvariantCampaignSpec::new(0).worker_plans(4).unwrap();
600
601        assert_eq!(plans, vec![InvariantWorkerPlan { worker_id: 0, first_global_run: 0, runs: 0 }]);
602    }
603
604    #[test]
605    fn worker_plans_reject_zero_workers() {
606        let err = InvariantCampaignSpec::new(1).worker_plans(0).unwrap_err();
607        assert!(err.to_string().contains("requires at least one worker"));
608
609        let err = InvariantCampaignSpec::new(0).worker_plans(0).unwrap_err();
610        assert!(err.to_string().contains("requires at least one worker"));
611    }
612
613    #[test]
614    fn campaign_state_stops_after_terminal_request() {
615        let state = InvariantCampaignState::new(EarlyExit::new(false), None);
616        assert!(!state.should_stop());
617
618        state.request_terminal_stop();
619
620        assert!(state.should_stop());
621    }
622
623    #[test]
624    fn campaign_state_uses_shared_timeout_and_global_throughput() {
625        let state = InvariantCampaignState::new(EarlyExit::new(false), Some(0));
626        std::thread::sleep(Duration::from_millis(1));
627
628        assert!(state.is_timed_campaign());
629        assert!(state.should_stop());
630
631        state.record_call(20);
632        state.record_call(30);
633        assert_eq!(state.throughput_totals(), (2, 50));
634        assert_eq!(state.increment_runs(), 1);
635        assert_eq!(state.total_runs(), 1);
636    }
637
638    #[test]
639    fn aggregator_returns_single_worker_result_without_rewriting() {
640        let spec = InvariantCampaignSpec::new(1);
641        let worker = InvariantWorkerOutput::new(one_worker_plan(1), empty_result(2, 3));
642
643        let mut aggregator = InvariantCampaignAggregator::new(spec);
644        aggregator.push(worker);
645        let result = aggregator.finish().unwrap();
646
647        assert_eq!(result.reverts, 2);
648        assert_eq!(result.failed_corpus_replays, 3);
649    }
650
651    #[test]
652    fn aggregator_accepts_single_worker_output_for_zero_run_campaign() {
653        let spec = InvariantCampaignSpec::new(0);
654        let worker = InvariantWorkerOutput::new(
655            InvariantWorkerPlan { worker_id: 0, first_global_run: 0, runs: 0 },
656            empty_result(0, 0),
657        );
658
659        let mut aggregator = InvariantCampaignAggregator::new(spec);
660        aggregator.push(worker);
661        let result = aggregator.finish().unwrap();
662
663        assert_eq!(result.reverts, 0);
664    }
665
666    #[test]
667    fn aggregator_merges_multiple_worker_outputs_in_logical_run_order() {
668        let spec = InvariantCampaignSpec::new(3);
669        let plans = [
670            InvariantWorkerPlan { worker_id: 0, first_global_run: 0, runs: 1 },
671            InvariantWorkerPlan { worker_id: 1, first_global_run: 1, runs: 1 },
672            InvariantWorkerPlan { worker_id: 2, first_global_run: 2, runs: 1 },
673        ];
674
675        let mut aggregator = InvariantCampaignAggregator::new(spec);
676        aggregator.push(InvariantWorkerOutput::new(
677            plans[2],
678            worker_result(
679                3,
680                0x30,
681                "transfer(address)",
682                InvariantMetrics { calls: 3, reverts: 1, discards: 0 },
683                3,
684                0,
685            ),
686        ));
687        aggregator.push(InvariantWorkerOutput::new(
688            plans[0],
689            worker_result(
690                1,
691                0x10,
692                "transfer(address)",
693                InvariantMetrics { calls: 1, reverts: 0, discards: 2 },
694                1,
695                4,
696            ),
697        ));
698        aggregator.push(InvariantWorkerOutput::new(
699            plans[1],
700            worker_result(
701                2,
702                0x20,
703                "approve(address)",
704                InvariantMetrics { calls: 2, reverts: 1, discards: 1 },
705                2,
706                0,
707            ),
708        ));
709
710        let result = aggregator.finish().unwrap();
711
712        assert_eq!(result.runs, 3);
713        assert_eq!(result.calls, 6);
714        assert_eq!(result.reverts, 6);
715        assert_eq!(result.gas_report_traces.len(), 3);
716        assert_eq!(result.last_run_inputs[0].sender, Address::repeat_byte(0x30));
717
718        let transfer_metrics = result.metrics.get("transfer(address)").unwrap();
719        assert_eq!(transfer_metrics, &InvariantMetrics { calls: 4, reverts: 1, discards: 2 });
720        let approve_metrics = result.metrics.get("approve(address)").unwrap();
721        assert_eq!(approve_metrics, &InvariantMetrics { calls: 2, reverts: 1, discards: 1 });
722
723        let coverage = result.line_coverage.unwrap();
724        assert_eq!(coverage.get(&B256::ZERO).unwrap().get(7).unwrap().get(), 6);
725        assert_eq!(result.failed_corpus_replays, 4);
726    }
727
728    #[test]
729    fn aggregator_preserves_run_and_call_counts() {
730        let spec = InvariantCampaignSpec::new(3);
731        let plans = [
732            InvariantWorkerPlan { worker_id: 0, first_global_run: 0, runs: 1 },
733            InvariantWorkerPlan { worker_id: 1, first_global_run: 1, runs: 2 },
734        ];
735        let mut first = empty_result(0, 0);
736        first.runs = 1;
737        first.calls = 1000;
738        let mut second = empty_result(0, 0);
739        second.runs = 2;
740        second.calls = 2000;
741
742        let mut aggregator = InvariantCampaignAggregator::new(spec);
743        aggregator.push(InvariantWorkerOutput::new(plans[1], second));
744        aggregator.push(InvariantWorkerOutput::new(plans[0], first));
745        let result = aggregator.finish().unwrap();
746
747        assert_eq!(result.runs, 3);
748        assert_eq!(result.calls, 3000);
749    }
750
751    #[test]
752    fn timeout_aggregator_accepts_partial_outputs_with_range_gaps() {
753        fn result_with_counts(
754            runs: usize,
755            calls: usize,
756            has_last_run: bool,
757            failed_corpus_replays: usize,
758        ) -> InvariantFuzzTestResult {
759            let mut result = empty_result(0, failed_corpus_replays);
760            result.runs = runs;
761            result.calls = calls;
762            result.last_run_inputs = if has_last_run { vec![basic_tx(0x44)] } else { Vec::new() };
763            result
764        }
765
766        let spec = InvariantCampaignSpec::new(10);
767        let outputs = [
768            InvariantWorkerOutput::new(
769                InvariantWorkerPlan { worker_id: 0, first_global_run: 0, runs: 2 },
770                result_with_counts(2, 20, true, 5),
771            ),
772            InvariantWorkerOutput::new(
773                InvariantWorkerPlan { worker_id: 1, first_global_run: 4, runs: 0 },
774                result_with_counts(0, 0, false, 0),
775            ),
776            InvariantWorkerOutput::new(
777                InvariantWorkerPlan { worker_id: 2, first_global_run: 7, runs: 1 },
778                result_with_counts(1, 10, true, 0),
779            ),
780        ];
781
782        let mut strict = InvariantCampaignAggregator::new(spec);
783        for output in outputs {
784            strict.push(output);
785        }
786        let err = strict.finish().unwrap_err();
787        assert!(err.to_string().contains("do not cover the logical campaign"));
788
789        let mut partial = InvariantCampaignAggregator::new(spec);
790        partial.push(InvariantWorkerOutput::new(
791            InvariantWorkerPlan { worker_id: 0, first_global_run: 0, runs: 2 },
792            result_with_counts(2, 20, true, 5),
793        ));
794        partial.push(InvariantWorkerOutput::new(
795            InvariantWorkerPlan { worker_id: 1, first_global_run: 4, runs: 0 },
796            result_with_counts(0, 0, false, 0),
797        ));
798        partial.push(InvariantWorkerOutput::new(
799            InvariantWorkerPlan { worker_id: 2, first_global_run: 7, runs: 1 },
800            result_with_counts(1, 10, true, 0),
801        ));
802
803        let (result, corpus_entries) = partial.finish_partial_with_corpus_entries().unwrap();
804
805        assert_eq!(result.runs, 3);
806        assert_eq!(result.calls, 30);
807        assert_eq!(result.failed_corpus_replays, 5);
808        assert!(corpus_entries.is_empty());
809    }
810
811    #[test]
812    fn aggregator_keeps_earlier_predicate_failure_for_each_invariant() {
813        let spec = InvariantCampaignSpec::new(2);
814        let plans = [
815            InvariantWorkerPlan { worker_id: 0, first_global_run: 0, runs: 1 },
816            InvariantWorkerPlan { worker_id: 1, first_global_run: 1, runs: 1 },
817        ];
818        let mut earlier = empty_result(0, 0);
819        earlier.errors.insert("invariant_balance".to_string(), predicate_error("earlier", 3));
820        let mut later = empty_result(0, 0);
821        later.errors.insert("invariant_balance".to_string(), predicate_error("later", 1));
822
823        let mut aggregator = InvariantCampaignAggregator::new(spec);
824        aggregator.push(InvariantWorkerOutput::new(plans[1], later));
825        aggregator.push(InvariantWorkerOutput::new(plans[0], earlier));
826        let result = aggregator.finish().unwrap();
827
828        assert_eq!(result.errors.len(), 1);
829        assert_eq!(result.errors["invariant_balance"].revert_reason().as_deref(), Some("earlier"));
830    }
831
832    #[test]
833    fn aggregator_dedupes_handler_assertions_by_site_and_keeps_shorter_sequence() {
834        let spec = InvariantCampaignSpec::new(2);
835        let plans = [
836            InvariantWorkerPlan { worker_id: 0, first_global_run: 0, runs: 1 },
837            InvariantWorkerPlan { worker_id: 1, first_global_run: 1, runs: 1 },
838        ];
839        let site = (Address::repeat_byte(0xaa), Selector::from([1, 2, 3, 4]));
840        let mut longer = empty_result(0, 0);
841        longer.handler_errors.insert(site, handler_error(site.0, site.1, 4, "longer"));
842        let mut shorter = empty_result(0, 0);
843        shorter.handler_errors.insert(site, handler_error(site.0, site.1, 2, "shorter"));
844
845        let mut aggregator = InvariantCampaignAggregator::new(spec);
846        aggregator.push(InvariantWorkerOutput::new(plans[1], shorter));
847        aggregator.push(InvariantWorkerOutput::new(plans[0], longer));
848        let result = aggregator.finish().unwrap();
849
850        let failure = result.handler_errors[&site].as_handler_assertion().unwrap();
851        assert_eq!(result.handler_errors.len(), 1);
852        assert_eq!(failure.call_sequence.len(), 2);
853        assert_eq!(failure.revert_reason, "shorter");
854    }
855
856    #[test]
857    fn aggregator_keeps_earlier_handler_assertion_when_lengths_tie() {
858        let spec = InvariantCampaignSpec::new(2);
859        let plans = [
860            InvariantWorkerPlan { worker_id: 0, first_global_run: 0, runs: 1 },
861            InvariantWorkerPlan { worker_id: 1, first_global_run: 1, runs: 1 },
862        ];
863        let site = (Address::repeat_byte(0xaa), Selector::from([1, 2, 3, 4]));
864        let mut earlier = empty_result(0, 0);
865        earlier.handler_errors.insert(site, handler_error(site.0, site.1, 2, "earlier"));
866        let mut later = empty_result(0, 0);
867        later.handler_errors.insert(site, handler_error(site.0, site.1, 2, "later"));
868
869        let mut aggregator = InvariantCampaignAggregator::new(spec);
870        aggregator.push(InvariantWorkerOutput::new(plans[1], later));
871        aggregator.push(InvariantWorkerOutput::new(plans[0], earlier));
872        let result = aggregator.finish().unwrap();
873
874        let failure = result.handler_errors[&site].as_handler_assertion().unwrap();
875        assert_eq!(result.handler_errors.len(), 1);
876        assert_eq!(failure.call_sequence.len(), 2);
877        assert_eq!(failure.revert_reason, "earlier");
878    }
879
880    #[test]
881    fn aggregator_keeps_distinct_predicate_failures() {
882        let spec = InvariantCampaignSpec::new(2);
883        let plans = [
884            InvariantWorkerPlan { worker_id: 0, first_global_run: 0, runs: 1 },
885            InvariantWorkerPlan { worker_id: 1, first_global_run: 1, runs: 1 },
886        ];
887        let mut earlier = empty_result(0, 0);
888        earlier.errors.insert("invariant_a".to_string(), predicate_error("a", 3));
889        let mut later = empty_result(0, 0);
890        later.errors.insert("invariant_b".to_string(), predicate_error("b", 2));
891
892        let mut aggregator = InvariantCampaignAggregator::new(spec);
893        aggregator.push(InvariantWorkerOutput::new(plans[1], later));
894        aggregator.push(InvariantWorkerOutput::new(plans[0], earlier));
895        let result = aggregator.finish().unwrap();
896
897        assert_eq!(result.errors.len(), 2);
898        assert_eq!(result.errors["invariant_a"].revert_reason().as_deref(), Some("a"));
899        assert_eq!(result.errors["invariant_b"].revert_reason().as_deref(), Some("b"));
900    }
901
902    #[test]
903    fn aggregator_keeps_first_max_optimization_value_on_tie() {
904        let spec = InvariantCampaignSpec::new(3);
905        let plans = [
906            InvariantWorkerPlan { worker_id: 0, first_global_run: 0, runs: 1 },
907            InvariantWorkerPlan { worker_id: 1, first_global_run: 1, runs: 1 },
908            InvariantWorkerPlan { worker_id: 2, first_global_run: 2, runs: 1 },
909        ];
910        let mut first = empty_result(0, 0);
911        first.optimization_best_value = Some(I256::try_from(7).unwrap());
912        first.optimization_best_sequence = sequence(1, 0x10);
913        let mut earlier_best = empty_result(0, 0);
914        earlier_best.optimization_best_value = Some(I256::try_from(9).unwrap());
915        earlier_best.optimization_best_sequence = sequence(1, 0x20);
916        let mut later_tie = empty_result(0, 0);
917        later_tie.optimization_best_value = Some(I256::try_from(9).unwrap());
918        later_tie.optimization_best_sequence = sequence(1, 0x30);
919
920        let mut aggregator = InvariantCampaignAggregator::new(spec);
921        aggregator.push(InvariantWorkerOutput::new(plans[2], later_tie));
922        aggregator.push(InvariantWorkerOutput::new(plans[0], first));
923        aggregator.push(InvariantWorkerOutput::new(plans[1], earlier_best));
924        let result = aggregator.finish().unwrap();
925
926        assert_eq!(result.optimization_best_value, Some(I256::try_from(9).unwrap()));
927        assert_eq!(result.optimization_best_sequence[0].sender, Address::repeat_byte(0x20));
928    }
929
930    #[test]
931    fn aggregator_rejects_overlapping_outputs() {
932        let spec = InvariantCampaignSpec::new(1);
933        let mut aggregator = InvariantCampaignAggregator::new(spec);
934
935        aggregator.push(InvariantWorkerOutput::new(
936            InvariantWorkerPlan { worker_id: 0, first_global_run: 0, runs: 1 },
937            empty_result(0, 0),
938        ));
939        aggregator.push(InvariantWorkerOutput::new(
940            InvariantWorkerPlan { worker_id: 1, first_global_run: 0, runs: 1 },
941            empty_result(0, 0),
942        ));
943        let err = aggregator.finish().unwrap_err();
944
945        assert!(err.to_string().contains("do not cover the logical campaign"));
946    }
947
948    #[test]
949    fn aggregator_rejects_duplicate_worker_ids() {
950        let spec = InvariantCampaignSpec::new(2);
951        let mut aggregator = InvariantCampaignAggregator::new(spec);
952
953        aggregator.push(InvariantWorkerOutput::new(
954            InvariantWorkerPlan { worker_id: 0, first_global_run: 0, runs: 1 },
955            empty_result(0, 0),
956        ));
957        aggregator.push(InvariantWorkerOutput::new(
958            InvariantWorkerPlan { worker_id: 0, first_global_run: 1, runs: 1 },
959            empty_result(0, 0),
960        ));
961        let err = aggregator.finish().unwrap_err();
962
963        assert!(err.to_string().contains("duplicate invariant worker output"));
964    }
965
966    #[test]
967    fn aggregator_allows_non_dense_worker_ids_with_contiguous_ranges() {
968        let spec = InvariantCampaignSpec::new(2);
969        let mut aggregator = InvariantCampaignAggregator::new(spec);
970
971        aggregator.push(InvariantWorkerOutput::new(
972            InvariantWorkerPlan { worker_id: 0, first_global_run: 0, runs: 1 },
973            empty_result(0, 0),
974        ));
975        aggregator.push(InvariantWorkerOutput::new(
976            InvariantWorkerPlan { worker_id: 2, first_global_run: 1, runs: 1 },
977            empty_result(2, 0),
978        ));
979        let result = aggregator.finish().unwrap();
980
981        assert_eq!(result.reverts, 2);
982    }
983
984    #[test]
985    fn aggregator_rejects_missing_master_worker() {
986        let spec = InvariantCampaignSpec::new(2);
987        let mut aggregator = InvariantCampaignAggregator::new(spec);
988
989        aggregator.push(InvariantWorkerOutput::new(
990            InvariantWorkerPlan { worker_id: 1, first_global_run: 0, runs: 1 },
991            empty_result(0, 0),
992        ));
993        aggregator.push(InvariantWorkerOutput::new(
994            InvariantWorkerPlan { worker_id: 2, first_global_run: 1, runs: 1 },
995            empty_result(0, 0),
996        ));
997        let err = aggregator.finish().unwrap_err();
998
999        assert!(err.to_string().contains("missing invariant master worker output"));
1000    }
1001
1002    #[test]
1003    fn aggregator_uses_master_failed_corpus_replays() {
1004        let spec = InvariantCampaignSpec::new(2);
1005        let plans = [
1006            InvariantWorkerPlan { worker_id: 0, first_global_run: 0, runs: 1 },
1007            InvariantWorkerPlan { worker_id: 1, first_global_run: 1, runs: 1 },
1008        ];
1009
1010        let mut aggregator = InvariantCampaignAggregator::new(spec);
1011        aggregator.push(InvariantWorkerOutput::new(plans[0], empty_result(0, 7)));
1012        aggregator.push(InvariantWorkerOutput::new(plans[1], empty_result(0, 1)));
1013        let result = aggregator.finish().unwrap();
1014
1015        assert_eq!(result.failed_corpus_replays, 7);
1016    }
1017
1018    #[test]
1019    fn aggregator_uses_master_failed_corpus_replays_independent_of_output_order() {
1020        let spec = InvariantCampaignSpec::new(2);
1021        let plans = [
1022            InvariantWorkerPlan { worker_id: 1, first_global_run: 0, runs: 1 },
1023            InvariantWorkerPlan { worker_id: 0, first_global_run: 1, runs: 1 },
1024        ];
1025
1026        let mut aggregator = InvariantCampaignAggregator::new(spec);
1027        aggregator.push(InvariantWorkerOutput::new(plans[0], empty_result(0, 0)));
1028        aggregator.push(InvariantWorkerOutput::new(plans[1], empty_result(0, 7)));
1029        let result = aggregator.finish().unwrap();
1030
1031        assert_eq!(result.failed_corpus_replays, 7);
1032    }
1033
1034    #[test]
1035    fn aggregator_rejects_plan_that_does_not_cover_campaign() {
1036        let spec = InvariantCampaignSpec::new(2);
1037        let plan = InvariantWorkerPlan { worker_id: 0, first_global_run: 0, runs: 1 };
1038        let worker = InvariantWorkerOutput::new(plan, empty_result(0, 0));
1039
1040        let mut aggregator = InvariantCampaignAggregator::new(spec);
1041        aggregator.push(worker);
1042        let err = aggregator.finish().unwrap_err();
1043
1044        assert!(err.to_string().contains("do not cover the logical campaign"));
1045    }
1046
1047    #[test]
1048    fn aggregator_rejects_missing_output() {
1049        let aggregator = InvariantCampaignAggregator::new(InvariantCampaignSpec::new(1));
1050        let err = aggregator.finish().unwrap_err();
1051
1052        assert!(err.to_string().contains("missing invariant worker output"));
1053    }
1054}