foundry_evm/executors/
corpus.rs

1//! Corpus management for parallel fuzzing with coverage-guided mutation.
2//!
3//! This module implements a corpus-based fuzzing system that stores, mutates, and shares
4//! transaction sequences across multiple fuzzing workers. Each corpus entry represents a
5//! sequence of transactions that has produced interesting coverage, and can be mutated to
6//! discover new execution paths.
7//!
8//! ## File System Structure
9//!
10//! The corpus is organized on disk as follows:
11//!
12//! ```text
13//! <corpus_dir>/
14//! ├── worker0/                  # Master (worker 0) directory
15//! │   ├── corpus/               # Master's corpus entries
16//! │   │   ├── <uuid>-<timestamp>.json          # Corpus entry (if small)
17//! │   │   ├── <uuid>-<timestamp>.json.gz       # Corpus entry (if large, compressed)
18//! │   └── sync/                 # Directory where other workers export new findings
19//! │       └── <uuid>-<timestamp>.json          # New entries from other workers
20//! └── workerN/                  # Worker N's directory
21//!     ├── corpus/               # Worker N's local corpus
22//!     │   └── ...
23//!     └── sync/                 # Worker 2's sync directory
24//!         └── ...
25//! ```
26//!
27//! ## Workflow
28//!
29//! - Each worker maintains its own local corpus with entries stored as JSON files
30//! - Workers export new interesting entries to the master's sync directory via hard links
31//! - The master (worker0) imports new entries from its sync directory and exports them to all the
32//!   other workers
33//! - Workers sync with the master to receive new corpus entries from other workers
34//! - This all happens periodically, there is no clear order in which workers export or import
35//!   entries since it doesn't matter as long as the corpus eventually syncs across all workers
36
37use crate::executors::{Executor, RawCallResult, invariant::execute_tx};
38use alloy_dyn_abi::JsonAbiExt;
39use alloy_json_abi::Function;
40use alloy_primitives::Bytes;
41use eyre::{Result, eyre};
42use foundry_config::FuzzCorpusConfig;
43use foundry_evm_fuzz::{
44    BasicTxDetails,
45    invariant::FuzzRunIdentifiedContracts,
46    strategies::{EvmFuzzState, mutate_param_value},
47};
48use proptest::{
49    prelude::{Just, Rng, Strategy},
50    prop_oneof,
51    strategy::{BoxedStrategy, ValueTree},
52    test_runner::TestRunner,
53};
54use serde::Serialize;
55use std::{
56    fmt,
57    path::{Path, PathBuf},
58    sync::{
59        Arc,
60        atomic::{AtomicUsize, Ordering},
61    },
62    time::{SystemTime, UNIX_EPOCH},
63};
64use uuid::Uuid;
65
66const WORKER: &str = "worker";
67const CORPUS_DIR: &str = "corpus";
68const SYNC_DIR: &str = "sync";
69
70const FAVORABILITY_THRESHOLD: f64 = 0.3;
71const COVERAGE_MAP_SIZE: usize = 65536;
72
73/// Threshold for compressing corpus entries.
74/// 4KiB is usually the minimum file size on popular file systems.
75const GZIP_THRESHOLD: usize = 4 * 1024;
76
77/// Possible mutation strategies to apply on a call sequence.
78#[derive(Debug, Clone)]
79enum MutationType {
80    /// Splice original call sequence.
81    Splice,
82    /// Repeat selected call several times.
83    Repeat,
84    /// Interleave calls from two random call sequences.
85    Interleave,
86    /// Replace prefix of the original call sequence with new calls.
87    Prefix,
88    /// Replace suffix of the original call sequence with new calls.
89    Suffix,
90    /// ABI mutate random args of selected call in sequence.
91    Abi,
92}
93
94/// Holds Corpus information.
95#[derive(Clone, Serialize)]
96struct CorpusEntry {
97    // Unique corpus identifier.
98    uuid: Uuid,
99    // Total mutations of corpus as primary source.
100    total_mutations: usize,
101    // New coverage found as a result of mutating this corpus.
102    new_finds_produced: usize,
103    // Corpus call sequence.
104    #[serde(skip_serializing)]
105    tx_seq: Vec<BasicTxDetails>,
106    // Whether this corpus is favored, i.e. producing new finds more often than
107    // `FAVORABILITY_THRESHOLD`.
108    is_favored: bool,
109    /// Timestamp of when this entry was written to disk in seconds.
110    #[serde(skip_serializing)]
111    timestamp: u64,
112}
113
114impl CorpusEntry {
115    /// Creates a corpus entry with a new UUID.
116    pub fn new(tx_seq: Vec<BasicTxDetails>) -> Self {
117        Self::new_with(tx_seq, Uuid::new_v4())
118    }
119
120    /// Creates a corpus entry with a path.
121    /// The UUID is parsed from the file name, otherwise a new UUID is generated.
122    pub fn new_existing(tx_seq: Vec<BasicTxDetails>, path: PathBuf) -> Result<Self> {
123        let Some(name) = path.file_name().and_then(|s| s.to_str()) else {
124            eyre::bail!("invalid corpus file path: {path:?}");
125        };
126        let uuid = parse_corpus_filename(name)?.0;
127        Ok(Self::new_with(tx_seq, uuid))
128    }
129
130    /// Creates a corpus entry with the given UUID.
131    pub fn new_with(tx_seq: Vec<BasicTxDetails>, uuid: Uuid) -> Self {
132        Self {
133            uuid,
134            total_mutations: 0,
135            new_finds_produced: 0,
136            tx_seq,
137            is_favored: false,
138            timestamp: SystemTime::now()
139                .duration_since(UNIX_EPOCH)
140                .expect("time went backwards")
141                .as_secs(),
142        }
143    }
144
145    fn write_to_disk_in(&self, dir: &Path, can_gzip: bool) -> foundry_common::fs::Result<()> {
146        let file_name = self.file_name(can_gzip);
147        let path = dir.join(file_name);
148        if self.should_gzip(can_gzip) {
149            foundry_common::fs::write_json_gzip_file(&path, &self.tx_seq)
150        } else {
151            foundry_common::fs::write_json_file(&path, &self.tx_seq)
152        }
153    }
154
155    fn file_name(&self, can_gzip: bool) -> String {
156        let ext = if self.should_gzip(can_gzip) { ".json.gz" } else { ".json" };
157        format!("{}-{}{ext}", self.uuid, self.timestamp)
158    }
159
160    fn should_gzip(&self, can_gzip: bool) -> bool {
161        if !can_gzip {
162            return false;
163        }
164        let size: usize = self.tx_seq.iter().map(|tx| tx.estimate_serialized_size()).sum();
165        size > GZIP_THRESHOLD
166    }
167}
168
169#[derive(Default)]
170pub(crate) struct GlobalCorpusMetrics {
171    // Number of edges seen during the invariant run.
172    cumulative_edges_seen: AtomicUsize,
173    // Number of features (new hitcount bin of previously hit edge) seen during the invariant run.
174    cumulative_features_seen: AtomicUsize,
175    // Number of corpus entries.
176    corpus_count: AtomicUsize,
177    // Number of corpus entries that are favored.
178    favored_items: AtomicUsize,
179}
180
181impl fmt::Display for GlobalCorpusMetrics {
182    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
183        self.load().fmt(f)
184    }
185}
186
187impl GlobalCorpusMetrics {
188    pub(crate) fn load(&self) -> CorpusMetrics {
189        CorpusMetrics {
190            cumulative_edges_seen: self.cumulative_edges_seen.load(Ordering::Relaxed),
191            cumulative_features_seen: self.cumulative_features_seen.load(Ordering::Relaxed),
192            corpus_count: self.corpus_count.load(Ordering::Relaxed),
193            favored_items: self.favored_items.load(Ordering::Relaxed),
194        }
195    }
196}
197
198#[derive(Serialize, Default, Clone)]
199pub(crate) struct CorpusMetrics {
200    // Number of edges seen during the invariant run.
201    cumulative_edges_seen: usize,
202    // Number of features (new hitcount bin of previously hit edge) seen during the invariant run.
203    cumulative_features_seen: usize,
204    // Number of corpus entries.
205    corpus_count: usize,
206    // Number of corpus entries that are favored.
207    favored_items: usize,
208}
209
210impl fmt::Display for CorpusMetrics {
211    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212        writeln!(f)?;
213        writeln!(f, "        - cumulative edges seen: {}", self.cumulative_edges_seen)?;
214        writeln!(f, "        - cumulative features seen: {}", self.cumulative_features_seen)?;
215        writeln!(f, "        - corpus count: {}", self.corpus_count)?;
216        write!(f, "        - favored items: {}", self.favored_items)?;
217        Ok(())
218    }
219}
220
221impl CorpusMetrics {
222    /// Records number of new edges or features explored during the campaign.
223    pub fn update_seen(&mut self, is_edge: bool) {
224        if is_edge {
225            self.cumulative_edges_seen += 1;
226        } else {
227            self.cumulative_features_seen += 1;
228        }
229    }
230
231    /// Updates campaign favored items.
232    pub fn update_favored(&mut self, is_favored: bool, corpus_favored: bool) {
233        if is_favored && !corpus_favored {
234            self.favored_items += 1;
235        } else if !is_favored && corpus_favored {
236            self.favored_items -= 1;
237        }
238    }
239}
240
241/// Per-worker corpus manager.
242pub struct WorkerCorpus {
243    /// Worker Id
244    id: usize,
245    /// In-memory corpus entries populated from the persisted files and
246    /// runs administered by this worker.
247    in_memory_corpus: Vec<CorpusEntry>,
248    /// History of binned hitcount of edges seen during fuzzing
249    history_map: Vec<u8>,
250    /// Number of failed replays from initial corpus
251    pub(crate) failed_replays: usize,
252    /// Worker Metrics
253    pub(crate) metrics: CorpusMetrics,
254    /// Fuzzed calls generator.
255    tx_generator: BoxedStrategy<BasicTxDetails>,
256    /// Call sequence mutation strategy type generator used by stateful fuzzing.
257    mutation_generator: BoxedStrategy<MutationType>,
258    /// Identifier of current mutated entry for this worker.
259    current_mutated: Option<Uuid>,
260    /// Config
261    config: Arc<FuzzCorpusConfig>,
262    /// Indices of new entries added to [`WorkerCorpus::in_memory_corpus`] since last sync.
263    new_entry_indices: Vec<usize>,
264    /// Last sync timestamp in seconds.
265    last_sync_timestamp: u64,
266    /// Worker Dir
267    /// corpus_dir/worker1/
268    worker_dir: Option<PathBuf>,
269    /// Metrics at last sync - used to calculate deltas while syncing with global metrics
270    last_sync_metrics: CorpusMetrics,
271}
272
273impl WorkerCorpus {
274    pub fn new(
275        id: usize,
276        config: FuzzCorpusConfig,
277        tx_generator: BoxedStrategy<BasicTxDetails>,
278        // Only required by master worker (id = 0) to replay existing corpus.
279        executor: Option<&Executor>,
280        fuzzed_function: Option<&Function>,
281        fuzzed_contracts: Option<&FuzzRunIdentifiedContracts>,
282    ) -> Result<Self> {
283        let mutation_generator = prop_oneof![
284            Just(MutationType::Splice),
285            Just(MutationType::Repeat),
286            Just(MutationType::Interleave),
287            Just(MutationType::Prefix),
288            Just(MutationType::Suffix),
289            Just(MutationType::Abi),
290        ]
291        .boxed();
292
293        let worker_dir = config.corpus_dir.as_ref().map(|corpus_dir| {
294            let worker_dir = corpus_dir.join(format!("{WORKER}{id}"));
295            let worker_corpus = worker_dir.join(CORPUS_DIR);
296            let sync_dir = worker_dir.join(SYNC_DIR);
297
298            // Create the necessary directories for the worker.
299            let _ = foundry_common::fs::create_dir_all(&worker_corpus);
300            let _ = foundry_common::fs::create_dir_all(&sync_dir);
301
302            worker_dir
303        });
304
305        let mut in_memory_corpus = vec![];
306        let mut history_map = vec![0u8; COVERAGE_MAP_SIZE];
307        let mut metrics = CorpusMetrics::default();
308        let mut failed_replays = 0;
309
310        if id == 0
311            && let Some(corpus_dir) = &config.corpus_dir
312        {
313            // Master worker loads the initial corpus, if it exists.
314            // Then, [distribute]s it to workers.
315            let executor = executor.expect("Executor required for master worker");
316            'corpus_replay: for entry in read_corpus_dir(corpus_dir) {
317                let tx_seq = entry.read_tx_seq()?;
318                if tx_seq.is_empty() {
319                    continue;
320                }
321                // Warm up history map from loaded sequences.
322                let mut executor = executor.clone();
323                for tx in &tx_seq {
324                    if Self::can_replay_tx(tx, fuzzed_function, fuzzed_contracts) {
325                        let mut call_result = execute_tx(&mut executor, tx)?;
326                        let (new_coverage, is_edge) =
327                            call_result.merge_edge_coverage(&mut history_map);
328                        if new_coverage {
329                            metrics.update_seen(is_edge);
330                        }
331
332                        // Commit only when running invariant / stateful tests.
333                        if fuzzed_contracts.is_some() {
334                            executor.commit(&mut call_result);
335                        }
336                    } else {
337                        failed_replays += 1;
338
339                        // If the only input for fuzzed function cannot be replied, then move to
340                        // next one without adding it in memory.
341                        if fuzzed_function.is_some() {
342                            continue 'corpus_replay;
343                        }
344                    }
345                }
346
347                metrics.corpus_count += 1;
348
349                debug!(
350                    target: "corpus",
351                    "load sequence with len {} from corpus file {}",
352                    tx_seq.len(),
353                    entry.path.display()
354                );
355
356                // Populate in memory corpus with the sequence from corpus file.
357                in_memory_corpus.push(CorpusEntry::new_with(tx_seq, entry.uuid));
358            }
359        }
360
361        Ok(Self {
362            id,
363            in_memory_corpus,
364            history_map,
365            failed_replays,
366            metrics,
367            tx_generator,
368            mutation_generator,
369            current_mutated: None,
370            config: config.into(),
371            new_entry_indices: Default::default(),
372            last_sync_timestamp: 0,
373            worker_dir,
374            last_sync_metrics: Default::default(),
375        })
376    }
377
378    /// Updates stats for the given call sequence, if new coverage produced.
379    /// Persists the call sequence (if corpus directory is configured and new coverage) and updates
380    /// in-memory corpus.
381    #[instrument(skip_all)]
382    pub fn process_inputs(&mut self, inputs: &[BasicTxDetails], new_coverage: bool) {
383        let Some(worker_corpus) = &self.worker_dir else {
384            return;
385        };
386        let worker_corpus = worker_corpus.join(CORPUS_DIR);
387
388        // Update stats of current mutated primary corpus.
389        if let Some(uuid) = &self.current_mutated {
390            if let Some(corpus) =
391                self.in_memory_corpus.iter_mut().find(|corpus| corpus.uuid == *uuid)
392            {
393                corpus.total_mutations += 1;
394                if new_coverage {
395                    corpus.new_finds_produced += 1
396                }
397                let is_favored = (corpus.new_finds_produced as f64 / corpus.total_mutations as f64)
398                    > FAVORABILITY_THRESHOLD;
399                self.metrics.update_favored(is_favored, corpus.is_favored);
400                corpus.is_favored = is_favored;
401
402                trace!(
403                    target: "corpus",
404                    "updated corpus {}, total mutations: {}, new finds: {}",
405                    corpus.uuid, corpus.total_mutations, corpus.new_finds_produced
406                );
407            }
408
409            self.current_mutated = None;
410        }
411
412        // Collect inputs only if current run produced new coverage.
413        if !new_coverage {
414            return;
415        }
416
417        assert!(!inputs.is_empty());
418        let corpus = CorpusEntry::new(inputs.to_vec());
419
420        // Persist to disk.
421        let write_result = corpus.write_to_disk_in(&worker_corpus, self.config.corpus_gzip);
422        if let Err(err) = write_result {
423            debug!(target: "corpus", %err, "failed to record call sequence {:?}", corpus.tx_seq);
424        } else {
425            trace!(
426                target: "corpus",
427                "persisted {} inputs for new coverage for {} corpus",
428                corpus.tx_seq.len(),
429                corpus.uuid,
430            );
431        }
432
433        // Track in-memory corpus changes to update MasterWorker on sync.
434        let new_index = self.in_memory_corpus.len();
435        self.new_entry_indices.push(new_index);
436
437        // This includes reverting txs in the corpus and `can_continue` removes
438        // them. We want this as it is new coverage and may help reach the other branch.
439        self.metrics.corpus_count += 1;
440        self.in_memory_corpus.push(corpus);
441    }
442
443    /// Collects coverage from call result and updates metrics.
444    pub fn merge_edge_coverage(&mut self, call_result: &mut RawCallResult) -> bool {
445        if !self.config.collect_edge_coverage() {
446            return false;
447        }
448
449        let (new_coverage, is_edge) = call_result.merge_edge_coverage(&mut self.history_map);
450        if new_coverage {
451            self.metrics.update_seen(is_edge);
452        }
453        new_coverage
454    }
455
456    /// Generates new call sequence from in memory corpus. Evicts oldest corpus mutated more than
457    /// configured max mutations value. Used by invariant test campaigns.
458    #[instrument(skip_all)]
459    pub fn new_inputs(
460        &mut self,
461        test_runner: &mut TestRunner,
462        fuzz_state: &EvmFuzzState,
463        targeted_contracts: &FuzzRunIdentifiedContracts,
464    ) -> Result<Vec<BasicTxDetails>> {
465        let mut new_seq = vec![];
466
467        // Early return with first_input only if corpus dir / coverage guided fuzzing not
468        // configured.
469        if !self.config.is_coverage_guided() {
470            new_seq.push(self.new_tx(test_runner)?);
471            return Ok(new_seq);
472        };
473
474        if !self.in_memory_corpus.is_empty() {
475            self.evict_oldest_corpus()?;
476
477            let mutation_type = self
478                .mutation_generator
479                .new_tree(test_runner)
480                .map_err(|err| eyre!("Could not generate mutation type {err}"))?
481                .current();
482
483            let rng = test_runner.rng();
484            let corpus_len = self.in_memory_corpus.len();
485            let primary = &self.in_memory_corpus[rng.random_range(0..corpus_len)];
486            let secondary = &self.in_memory_corpus[rng.random_range(0..corpus_len)];
487
488            match mutation_type {
489                MutationType::Splice => {
490                    trace!(target: "corpus", "splice {} and {}", primary.uuid, secondary.uuid);
491
492                    self.current_mutated = Some(primary.uuid);
493
494                    let start1 = rng.random_range(0..primary.tx_seq.len());
495                    let end1 = rng.random_range(start1..primary.tx_seq.len());
496
497                    let start2 = rng.random_range(0..secondary.tx_seq.len());
498                    let end2 = rng.random_range(start2..secondary.tx_seq.len());
499
500                    for tx in primary.tx_seq.iter().take(end1).skip(start1) {
501                        new_seq.push(tx.clone());
502                    }
503                    for tx in secondary.tx_seq.iter().take(end2).skip(start2) {
504                        new_seq.push(tx.clone());
505                    }
506                }
507                MutationType::Repeat => {
508                    let corpus = if rng.random::<bool>() { primary } else { secondary };
509                    trace!(target: "corpus", "repeat {}", corpus.uuid);
510
511                    self.current_mutated = Some(corpus.uuid);
512
513                    new_seq = corpus.tx_seq.clone();
514                    let start = rng.random_range(0..corpus.tx_seq.len());
515                    let end = rng.random_range(start..corpus.tx_seq.len());
516                    let item_idx = rng.random_range(0..corpus.tx_seq.len());
517                    let repeated = vec![new_seq[item_idx].clone(); end - start];
518                    new_seq.splice(start..end, repeated);
519                }
520                MutationType::Interleave => {
521                    trace!(target: "corpus", "interleave {} with {}", primary.uuid, secondary.uuid);
522
523                    self.current_mutated = Some(primary.uuid);
524
525                    for (tx1, tx2) in primary.tx_seq.iter().zip(secondary.tx_seq.iter()) {
526                        // TODO: chunks?
527                        let tx = if rng.random::<bool>() { tx1.clone() } else { tx2.clone() };
528                        new_seq.push(tx);
529                    }
530                }
531                MutationType::Prefix => {
532                    let corpus = if rng.random::<bool>() { primary } else { secondary };
533                    trace!(target: "corpus", "overwrite prefix of {}", corpus.uuid);
534
535                    self.current_mutated = Some(corpus.uuid);
536
537                    new_seq = corpus.tx_seq.clone();
538                    for i in 0..rng.random_range(0..=new_seq.len()) {
539                        new_seq[i] = self.new_tx(test_runner)?;
540                    }
541                }
542                MutationType::Suffix => {
543                    let corpus = if rng.random::<bool>() { primary } else { secondary };
544                    trace!(target: "corpus", "overwrite suffix of {}", corpus.uuid);
545
546                    self.current_mutated = Some(corpus.uuid);
547
548                    new_seq = corpus.tx_seq.clone();
549                    for i in new_seq.len() - rng.random_range(0..new_seq.len())..corpus.tx_seq.len()
550                    {
551                        new_seq[i] = self.new_tx(test_runner)?;
552                    }
553                }
554                MutationType::Abi => {
555                    let targets = targeted_contracts.targets.lock();
556                    let corpus = if rng.random::<bool>() { primary } else { secondary };
557                    trace!(target: "corpus", "ABI mutate args of {}", corpus.uuid);
558
559                    self.current_mutated = Some(corpus.uuid);
560
561                    new_seq = corpus.tx_seq.clone();
562
563                    let idx = rng.random_range(0..new_seq.len());
564                    let tx = new_seq.get_mut(idx).unwrap();
565                    if let (_, Some(function)) = targets.fuzzed_artifacts(tx) {
566                        // TODO: add call_value to call details and mutate it as well as sender some
567                        // of the time.
568                        if !function.inputs.is_empty() {
569                            self.abi_mutate(tx, function, test_runner, fuzz_state)?;
570                        }
571                    }
572                }
573            }
574        }
575
576        // Make sure the new sequence contains at least one tx to start fuzzing from.
577        if new_seq.is_empty() {
578            new_seq.push(self.new_tx(test_runner)?);
579        }
580        trace!(target: "corpus", "new sequence of {} calls generated", new_seq.len());
581
582        Ok(new_seq)
583    }
584
585    /// Generates a new input from the shared in memory corpus.  Evicts oldest corpus mutated more
586    /// than configured max mutations value. Used by fuzz (stateless) test campaigns.
587    #[instrument(skip_all)]
588    pub fn new_input(
589        &mut self,
590        test_runner: &mut TestRunner,
591        fuzz_state: &EvmFuzzState,
592        function: &Function,
593    ) -> Result<Bytes> {
594        // Early return if not running with coverage guided fuzzing.
595        if !self.config.is_coverage_guided() {
596            return Ok(self.new_tx(test_runner)?.call_details.calldata);
597        }
598
599        self.evict_oldest_corpus()?;
600
601        let tx = if !self.in_memory_corpus.is_empty() {
602            let corpus = &self.in_memory_corpus
603                [test_runner.rng().random_range(0..self.in_memory_corpus.len())];
604            self.current_mutated = Some(corpus.uuid);
605            let mut tx = corpus.tx_seq.first().unwrap().clone();
606            self.abi_mutate(&mut tx, function, test_runner, fuzz_state)?;
607            tx
608        } else {
609            self.new_tx(test_runner)?
610        };
611
612        Ok(tx.call_details.calldata)
613    }
614
615    /// Generates single call from corpus strategy.
616    pub fn new_tx(&self, test_runner: &mut TestRunner) -> Result<BasicTxDetails> {
617        Ok(self
618            .tx_generator
619            .new_tree(test_runner)
620            .map_err(|_| eyre!("Could not generate case"))?
621            .current())
622    }
623
624    /// Returns the next call to be used in call sequence.
625    /// If coverage guided fuzzing is not configured or if previous input was discarded then this is
626    /// a new tx from strategy.
627    /// If running with coverage guided fuzzing it returns a new call only when sequence
628    /// does not have enough entries, or randomly. Otherwise, returns the next call from initial
629    /// sequence.
630    pub fn generate_next_input(
631        &mut self,
632        test_runner: &mut TestRunner,
633        sequence: &[BasicTxDetails],
634        discarded: bool,
635        depth: usize,
636    ) -> Result<BasicTxDetails> {
637        // Early return with new input if corpus dir / coverage guided fuzzing not configured or if
638        // call was discarded.
639        if self.config.corpus_dir.is_none() || discarded {
640            return self.new_tx(test_runner);
641        }
642
643        // When running with coverage guided fuzzing enabled then generate new sequence if initial
644        // sequence's length is less than depth or randomly, to occasionally intermix new txs.
645        if depth > sequence.len().saturating_sub(1) || test_runner.rng().random_ratio(1, 10) {
646            return self.new_tx(test_runner);
647        }
648
649        // Continue with the next call initial sequence.
650        Ok(sequence[depth].clone())
651    }
652
653    /// Flush the oldest corpus mutated more than configured max mutations unless they are
654    /// favored.
655    fn evict_oldest_corpus(&mut self) -> Result<()> {
656        if self.in_memory_corpus.len() > self.config.corpus_min_size.max(1)
657            && let Some(index) = self.in_memory_corpus.iter().position(|corpus| {
658                corpus.total_mutations > self.config.corpus_min_mutations && !corpus.is_favored
659            })
660        {
661            let corpus = &self.in_memory_corpus[index];
662
663            trace!(target: "corpus", corpus=%serde_json::to_string(&corpus).unwrap(), "evict corpus");
664
665            // Remove corpus from memory.
666            self.in_memory_corpus.remove(index);
667
668            // Adjust the tracked indices.
669            self.new_entry_indices.retain_mut(|i| {
670                if *i > index {
671                    *i -= 1; // Shift indices down.
672                    true // Keep this index.
673                } else {
674                    *i != index // Remove if it's the deleted index, keep otherwise.
675                }
676            });
677        }
678        Ok(())
679    }
680
681    /// Mutates calldata of provided tx by abi decoding current values and randomly selecting the
682    /// inputs to change.
683    fn abi_mutate(
684        &self,
685        tx: &mut BasicTxDetails,
686        function: &Function,
687        test_runner: &mut TestRunner,
688        fuzz_state: &EvmFuzzState,
689    ) -> Result<()> {
690        // let rng = test_runner.rng();
691        let mut arg_mutation_rounds =
692            test_runner.rng().random_range(0..=function.inputs.len()).max(1);
693        let round_arg_idx: Vec<usize> = if function.inputs.len() <= 1 {
694            vec![0]
695        } else {
696            (0..arg_mutation_rounds)
697                .map(|_| test_runner.rng().random_range(0..function.inputs.len()))
698                .collect()
699        };
700        let mut prev_inputs = function
701            .abi_decode_input(&tx.call_details.calldata[4..])
702            .map_err(|err| eyre!("failed to load previous inputs: {err}"))?;
703
704        while arg_mutation_rounds > 0 {
705            let idx = round_arg_idx[arg_mutation_rounds - 1];
706            prev_inputs[idx] = mutate_param_value(
707                &function
708                    .inputs
709                    .get(idx)
710                    .expect("Could not get input to mutate")
711                    .selector_type()
712                    .parse()?,
713                prev_inputs[idx].clone(),
714                test_runner,
715                fuzz_state,
716            );
717            arg_mutation_rounds -= 1;
718        }
719
720        tx.call_details.calldata =
721            function.abi_encode_input(&prev_inputs).map_err(|e| eyre!(e.to_string()))?.into();
722        Ok(())
723    }
724
725    // Sync Methods.
726
727    /// Imports the new corpus entries from the `sync` directory.
728    /// These contain tx sequences which are replayed and used to update the history map.
729    fn load_sync_corpus(&self) -> Result<Vec<(CorpusDirEntry, Vec<BasicTxDetails>)>> {
730        let Some(worker_dir) = &self.worker_dir else {
731            return Ok(vec![]);
732        };
733
734        let sync_dir = worker_dir.join(SYNC_DIR);
735        if !sync_dir.is_dir() {
736            return Ok(vec![]);
737        }
738
739        let mut imports = vec![];
740        for entry in read_corpus_dir(&sync_dir) {
741            if entry.timestamp <= self.last_sync_timestamp {
742                continue;
743            }
744            let tx_seq = entry.read_tx_seq()?;
745            if tx_seq.is_empty() {
746                warn!(target: "corpus", "skipping empty corpus entry: {}", entry.path.display());
747                continue;
748            }
749            imports.push((entry, tx_seq));
750        }
751
752        if !imports.is_empty() {
753            debug!(target: "corpus", "imported {} new corpus entries", imports.len());
754        }
755
756        Ok(imports)
757    }
758
759    /// Syncs and calibrates the in memory corpus and updates the history_map if new coverage is
760    /// found from the corpus findings of other workers.
761    #[instrument(skip_all)]
762    fn calibrate(
763        &mut self,
764        executor: &Executor,
765        fuzzed_function: Option<&Function>,
766        fuzzed_contracts: Option<&FuzzRunIdentifiedContracts>,
767    ) -> Result<()> {
768        let Some(worker_dir) = &self.worker_dir else {
769            return Ok(());
770        };
771        let corpus_dir = worker_dir.join(CORPUS_DIR);
772
773        let mut executor = executor.clone();
774        for (entry, tx_seq) in self.load_sync_corpus()? {
775            let mut new_coverage_on_sync = false;
776            for tx in &tx_seq {
777                if !Self::can_replay_tx(tx, fuzzed_function, fuzzed_contracts) {
778                    continue;
779                }
780
781                let mut call_result = execute_tx(&mut executor, tx)?;
782
783                // Check if this provides new coverage.
784                let (new_coverage, is_edge) =
785                    call_result.merge_edge_coverage(&mut self.history_map);
786
787                if new_coverage {
788                    self.metrics.update_seen(is_edge);
789                    new_coverage_on_sync = true;
790                }
791
792                // Commit only for stateful tests.
793                if fuzzed_contracts.is_some() {
794                    executor.commit(&mut call_result);
795                }
796
797                trace!(
798                    target: "corpus",
799                    %new_coverage,
800                    ?tx,
801                    "replayed tx for syncing",
802                );
803            }
804
805            let sync_path = &entry.path;
806            if new_coverage_on_sync {
807                // Move file from sync/ to corpus/ directory.
808                let corpus_path = corpus_dir.join(sync_path.components().next_back().unwrap());
809                if let Err(err) = std::fs::rename(sync_path, &corpus_path) {
810                    debug!(target: "corpus", %err, "failed to move synced corpus from {sync_path:?} to {corpus_path:?} dir");
811                    continue;
812                }
813
814                debug!(
815                    target: "corpus",
816                    name=%entry.name(),
817                    "moved synced corpus to corpus dir",
818                );
819
820                let corpus_entry = CorpusEntry::new_existing(tx_seq.to_vec(), entry.path.clone())?;
821                self.in_memory_corpus.push(corpus_entry);
822            } else {
823                // Remove the file as it did not generate new coverage.
824                if let Err(err) = std::fs::remove_file(&entry.path) {
825                    debug!(target: "corpus", %err, "failed to remove synced corpus from {sync_path:?}");
826                    continue;
827                }
828                trace!(target: "corpus", "removed synced corpus from {sync_path:?}");
829            }
830        }
831
832        Ok(())
833    }
834
835    /// Exports the new corpus entries to the master worker's sync dir.
836    #[instrument(skip_all)]
837    fn export_to_master(&self) -> Result<()> {
838        // Master doesn't export (it only receives from others).
839        assert_ne!(self.id, 0, "non-master only");
840
841        // Early return if no new entries or corpus dir not configured.
842        if self.new_entry_indices.is_empty() || self.worker_dir.is_none() {
843            return Ok(());
844        }
845
846        let worker_dir = self.worker_dir.as_ref().unwrap();
847        let Some(master_sync_dir) = self
848            .config
849            .corpus_dir
850            .as_ref()
851            .map(|dir| dir.join(format!("{WORKER}0")).join(SYNC_DIR))
852        else {
853            return Ok(());
854        };
855
856        let mut exported = 0;
857        let corpus_dir = worker_dir.join(CORPUS_DIR);
858
859        for &index in &self.new_entry_indices {
860            let Some(corpus) = self.in_memory_corpus.get(index) else { continue };
861            let file_name = corpus.file_name(self.config.corpus_gzip);
862            let file_path = corpus_dir.join(&file_name);
863            let sync_path = master_sync_dir.join(&file_name);
864            if let Err(err) = std::fs::hard_link(&file_path, &sync_path) {
865                debug!(target: "corpus", %err, "failed to export corpus {}", corpus.uuid);
866                continue;
867            }
868            exported += 1;
869        }
870
871        debug!(target: "corpus", "exported {exported} new corpus entries");
872
873        Ok(())
874    }
875
876    /// Exports the global corpus to the `sync/` directories of all the non-master workers.
877    #[instrument(skip_all)]
878    fn export_to_workers(&mut self, num_workers: usize) -> Result<()> {
879        assert_eq!(self.id, 0, "master worker only");
880        if self.worker_dir.is_none() {
881            return Ok(());
882        }
883
884        let worker_dir = self.worker_dir.as_ref().unwrap();
885        let master_corpus_dir = worker_dir.join(CORPUS_DIR);
886        let filtered_master_corpus = read_corpus_dir(&master_corpus_dir)
887            .filter(|entry| entry.timestamp > self.last_sync_timestamp)
888            .collect::<Vec<_>>();
889        let mut any_distributed = false;
890        for target_worker in 1..num_workers {
891            let target_dir = self
892                .config
893                .corpus_dir
894                .as_ref()
895                .unwrap()
896                .join(format!("{WORKER}{target_worker}"))
897                .join(SYNC_DIR);
898
899            if !target_dir.is_dir() {
900                foundry_common::fs::create_dir_all(&target_dir)?;
901            }
902
903            for entry in &filtered_master_corpus {
904                let name = entry.name();
905                let sync_path = target_dir.join(name);
906                if let Err(err) = std::fs::hard_link(&entry.path, &sync_path) {
907                    debug!(target: "corpus", %err, from=?entry.path, to=?sync_path, "failed to distribute corpus");
908                    continue;
909                }
910                any_distributed = true;
911                trace!(target: "corpus", %name, ?target_dir, "distributed corpus");
912            }
913        }
914
915        debug!(target: "corpus", %any_distributed, "distributed master corpus to all workers");
916
917        Ok(())
918    }
919
920    // TODO(dani): currently only master syncs metrics?
921    /// Syncs local metrics with global corpus metrics by calculating and applying deltas.
922    pub(crate) fn sync_metrics(&mut self, global_corpus_metrics: &GlobalCorpusMetrics) {
923        // Calculate delta metrics since last sync.
924        let edges_delta = self
925            .metrics
926            .cumulative_edges_seen
927            .saturating_sub(self.last_sync_metrics.cumulative_edges_seen);
928        let features_delta = self
929            .metrics
930            .cumulative_features_seen
931            .saturating_sub(self.last_sync_metrics.cumulative_features_seen);
932        // For corpus count and favored items, calculate deltas.
933        let corpus_count_delta =
934            self.metrics.corpus_count as isize - self.last_sync_metrics.corpus_count as isize;
935        let favored_delta =
936            self.metrics.favored_items as isize - self.last_sync_metrics.favored_items as isize;
937
938        // Add delta values to global metrics.
939
940        if edges_delta > 0 {
941            global_corpus_metrics.cumulative_edges_seen.fetch_add(edges_delta, Ordering::Relaxed);
942        }
943        if features_delta > 0 {
944            global_corpus_metrics
945                .cumulative_features_seen
946                .fetch_add(features_delta, Ordering::Relaxed);
947        }
948
949        if corpus_count_delta > 0 {
950            global_corpus_metrics
951                .corpus_count
952                .fetch_add(corpus_count_delta as usize, Ordering::Relaxed);
953        } else if corpus_count_delta < 0 {
954            global_corpus_metrics
955                .corpus_count
956                .fetch_sub((-corpus_count_delta) as usize, Ordering::Relaxed);
957        }
958
959        if favored_delta > 0 {
960            global_corpus_metrics
961                .favored_items
962                .fetch_add(favored_delta as usize, Ordering::Relaxed);
963        } else if favored_delta < 0 {
964            global_corpus_metrics
965                .favored_items
966                .fetch_sub((-favored_delta) as usize, Ordering::Relaxed);
967        }
968
969        // Store current metrics as last sync metrics for next delta calculation.
970        self.last_sync_metrics = self.metrics.clone();
971    }
972
973    /// Syncs the workers in_memory_corpus and history_map with the findings from other workers.
974    #[instrument(skip_all)]
975    pub fn sync(
976        &mut self,
977        num_workers: usize,
978        executor: &Executor,
979        fuzzed_function: Option<&Function>,
980        fuzzed_contracts: Option<&FuzzRunIdentifiedContracts>,
981        global_corpus_metrics: &GlobalCorpusMetrics,
982    ) -> Result<()> {
983        trace!(target: "corpus", "syncing");
984
985        self.sync_metrics(global_corpus_metrics);
986
987        self.calibrate(executor, fuzzed_function, fuzzed_contracts)?;
988        if self.id == 0 {
989            self.export_to_workers(num_workers)?;
990        } else {
991            self.export_to_master()?;
992        }
993
994        let last_sync = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
995        self.last_sync_timestamp = last_sync;
996
997        self.new_entry_indices.clear();
998
999        debug!(target: "corpus", last_sync, "synced");
1000
1001        Ok(())
1002    }
1003
1004    /// Helper to check if a tx can be replayed.
1005    fn can_replay_tx(
1006        tx: &BasicTxDetails,
1007        fuzzed_function: Option<&Function>,
1008        fuzzed_contracts: Option<&FuzzRunIdentifiedContracts>,
1009    ) -> bool {
1010        fuzzed_contracts.is_some_and(|contracts| contracts.targets.lock().can_replay(tx))
1011            || fuzzed_function.is_some_and(|function| {
1012                tx.call_details
1013                    .calldata
1014                    .get(..4)
1015                    .is_some_and(|selector| function.selector() == selector)
1016            })
1017    }
1018}
1019
1020fn read_corpus_dir(path: &Path) -> impl Iterator<Item = CorpusDirEntry> {
1021    let dir = match std::fs::read_dir(path) {
1022        Ok(dir) => dir,
1023        Err(err) => {
1024            debug!(%err, ?path, "failed to read corpus directory");
1025            return vec![].into_iter();
1026        }
1027    };
1028    dir.filter_map(|res| match res {
1029        Ok(entry) => {
1030            let path = entry.path();
1031            if !path.is_file() {
1032                return None;
1033            }
1034            let name = if path.is_file()
1035                && let Some(name) = path.file_name()
1036                && let Some(name) = name.to_str()
1037            {
1038                name
1039            } else {
1040                return None;
1041            };
1042
1043            if let Ok((uuid, timestamp)) = parse_corpus_filename(name) {
1044                Some(CorpusDirEntry { path, uuid, timestamp })
1045            } else {
1046                debug!(target: "corpus", ?path, "failed to parse corpus filename");
1047                None
1048            }
1049        }
1050        Err(err) => {
1051            debug!(%err, "failed to read corpus directory entry");
1052            None
1053        }
1054    })
1055    .collect::<Vec<_>>()
1056    .into_iter()
1057}
1058
1059struct CorpusDirEntry {
1060    path: PathBuf,
1061    uuid: Uuid,
1062    timestamp: u64,
1063}
1064
1065impl CorpusDirEntry {
1066    fn name(&self) -> &str {
1067        self.path.file_name().unwrap().to_str().unwrap()
1068    }
1069
1070    fn read_tx_seq(&self) -> foundry_common::fs::Result<Vec<BasicTxDetails>> {
1071        let path = &self.path;
1072        if path.extension() == Some("gz".as_ref()) {
1073            foundry_common::fs::read_json_gzip_file(path)
1074        } else {
1075            foundry_common::fs::read_json_file(path)
1076        }
1077    }
1078}
1079
1080/// Parses the corpus filename and returns the uuid and timestamp associated with it.
1081fn parse_corpus_filename(name: &str) -> Result<(Uuid, u64)> {
1082    let name = name.trim_end_matches(".gz").trim_end_matches(".json");
1083
1084    let (uuid_str, timestamp_str) =
1085        name.rsplit_once('-').ok_or_else(|| eyre!("invalid corpus filename format: {name}"))?;
1086
1087    let uuid = Uuid::parse_str(uuid_str)?;
1088    let timestamp = timestamp_str.parse()?;
1089
1090    Ok((uuid, timestamp))
1091}
1092
1093#[cfg(test)]
1094mod tests {
1095    use super::*;
1096    use alloy_primitives::Address;
1097    use std::fs;
1098
1099    fn basic_tx() -> BasicTxDetails {
1100        BasicTxDetails {
1101            warp: None,
1102            roll: None,
1103            sender: Address::ZERO,
1104            call_details: foundry_evm_fuzz::CallDetails {
1105                target: Address::ZERO,
1106                calldata: Bytes::new(),
1107            },
1108        }
1109    }
1110
1111    fn temp_corpus_dir() -> PathBuf {
1112        let dir = std::env::temp_dir().join(format!("foundry-corpus-tests-{}", Uuid::new_v4()));
1113        let _ = fs::create_dir_all(&dir);
1114        dir
1115    }
1116
1117    fn new_manager_with_single_corpus() -> (WorkerCorpus, Uuid) {
1118        let tx_gen = Just(basic_tx()).boxed();
1119        let config = FuzzCorpusConfig {
1120            corpus_dir: Some(temp_corpus_dir()),
1121            corpus_gzip: false,
1122            corpus_min_mutations: 0,
1123            corpus_min_size: 0,
1124            ..Default::default()
1125        };
1126
1127        let tx_seq = vec![basic_tx()];
1128        let corpus = CorpusEntry::new(tx_seq);
1129        let seed_uuid = corpus.uuid;
1130
1131        // Create corpus root dir and worker subdirectory.
1132        let corpus_root = config.corpus_dir.clone().unwrap();
1133        let worker_subdir = corpus_root.join("worker0");
1134        let _ = fs::create_dir_all(&worker_subdir);
1135
1136        let manager = WorkerCorpus {
1137            id: 0,
1138            tx_generator: tx_gen,
1139            mutation_generator: Just(MutationType::Repeat).boxed(),
1140            config: config.into(),
1141            in_memory_corpus: vec![corpus],
1142            current_mutated: Some(seed_uuid),
1143            failed_replays: 0,
1144            history_map: vec![0u8; COVERAGE_MAP_SIZE],
1145            metrics: CorpusMetrics::default(),
1146            new_entry_indices: Default::default(),
1147            last_sync_timestamp: 0,
1148            worker_dir: Some(corpus_root),
1149            last_sync_metrics: CorpusMetrics::default(),
1150        };
1151
1152        (manager, seed_uuid)
1153    }
1154
1155    #[test]
1156    fn favored_sets_true_and_metrics_increment_when_ratio_gt_threshold() {
1157        let (mut manager, uuid) = new_manager_with_single_corpus();
1158        let corpus = manager.in_memory_corpus.iter_mut().find(|c| c.uuid == uuid).unwrap();
1159        corpus.total_mutations = 4;
1160        corpus.new_finds_produced = 2; // ratio currently 0.5 if both increment → 3/5 = 0.6 > 0.3.
1161        corpus.is_favored = false;
1162
1163        // Ensure metrics start at 0.
1164        assert_eq!(manager.metrics.favored_items, 0);
1165
1166        // Mark this as the currently mutated corpus and process a run with new coverage.
1167        manager.current_mutated = Some(uuid);
1168        manager.process_inputs(&[basic_tx()], true);
1169
1170        let corpus = manager.in_memory_corpus.iter().find(|c| c.uuid == uuid).unwrap();
1171        assert!(corpus.is_favored, "expected favored to be true when ratio > threshold");
1172        assert_eq!(
1173            manager.metrics.favored_items, 1,
1174            "favored_items should increment on false→true"
1175        );
1176    }
1177
1178    #[test]
1179    fn favored_sets_false_and_metrics_decrement_when_ratio_lt_threshold() {
1180        let (mut manager, uuid) = new_manager_with_single_corpus();
1181        let corpus = manager.in_memory_corpus.iter_mut().find(|c| c.uuid == uuid).unwrap();
1182        corpus.total_mutations = 9;
1183        corpus.new_finds_produced = 3; // 3/9 = 0.333.. > 0.3; after +1: 3/10 = 0.3 => not favored.
1184        corpus.is_favored = true; // Start as favored.
1185
1186        manager.metrics.favored_items = 1;
1187
1188        // Next run does NOT produce coverage → only total_mutations increments, ratio drops.
1189        manager.current_mutated = Some(uuid);
1190        manager.process_inputs(&[basic_tx()], false);
1191
1192        let corpus = manager.in_memory_corpus.iter().find(|c| c.uuid == uuid).unwrap();
1193        assert!(!corpus.is_favored, "expected favored to be false when ratio < threshold");
1194        assert_eq!(
1195            manager.metrics.favored_items, 0,
1196            "favored_items should decrement on true→false"
1197        );
1198    }
1199
1200    #[test]
1201    fn favored_is_false_on_ratio_equal_threshold() {
1202        let (mut manager, uuid) = new_manager_with_single_corpus();
1203        let corpus = manager.in_memory_corpus.iter_mut().find(|c| c.uuid == uuid).unwrap();
1204        // After this call with new_coverage=true, totals become 10 and 3 → 0.3.
1205        corpus.total_mutations = 9;
1206        corpus.new_finds_produced = 2;
1207        corpus.is_favored = false;
1208
1209        manager.current_mutated = Some(uuid);
1210        manager.process_inputs(&[basic_tx()], true);
1211
1212        let corpus = manager.in_memory_corpus.iter().find(|c| c.uuid == uuid).unwrap();
1213        assert!(
1214            !(corpus.is_favored),
1215            "with strict '>' comparison, favored must be false when ratio == threshold"
1216        );
1217    }
1218
1219    #[test]
1220    fn eviction_skips_favored_and_evicts_non_favored() {
1221        // Manager with two corpora.
1222        let tx_gen = Just(basic_tx()).boxed();
1223        let config = FuzzCorpusConfig {
1224            corpus_dir: Some(temp_corpus_dir()),
1225            corpus_min_mutations: 0,
1226            corpus_min_size: 0,
1227            ..Default::default()
1228        };
1229
1230        let mut favored = CorpusEntry::new(vec![basic_tx()]);
1231        favored.total_mutations = 2;
1232        favored.is_favored = true;
1233
1234        let mut non_favored = CorpusEntry::new(vec![basic_tx()]);
1235        non_favored.total_mutations = 2;
1236        non_favored.is_favored = false;
1237        let non_favored_uuid = non_favored.uuid;
1238
1239        let corpus_root = temp_corpus_dir();
1240        let worker_subdir = corpus_root.join("worker0");
1241        fs::create_dir_all(&worker_subdir).unwrap();
1242
1243        let mut manager = WorkerCorpus {
1244            id: 0,
1245            tx_generator: tx_gen,
1246            mutation_generator: Just(MutationType::Repeat).boxed(),
1247            config: config.into(),
1248            in_memory_corpus: vec![favored, non_favored],
1249            current_mutated: None,
1250            failed_replays: 0,
1251            history_map: vec![0u8; COVERAGE_MAP_SIZE],
1252            metrics: CorpusMetrics::default(),
1253            new_entry_indices: Default::default(),
1254            last_sync_timestamp: 0,
1255            worker_dir: Some(corpus_root),
1256            last_sync_metrics: CorpusMetrics::default(),
1257        };
1258
1259        // First eviction should remove the non-favored one.
1260        manager.evict_oldest_corpus().unwrap();
1261        assert_eq!(manager.in_memory_corpus.len(), 1);
1262        assert!(manager.in_memory_corpus.iter().all(|c| c.is_favored));
1263
1264        // Attempt eviction again: only favored remains → should not remove.
1265        manager.evict_oldest_corpus().unwrap();
1266        assert_eq!(manager.in_memory_corpus.len(), 1, "favored corpus must not be evicted");
1267
1268        // Ensure the evicted one was the non-favored uuid.
1269        assert!(manager.in_memory_corpus.iter().all(|c| c.uuid != non_favored_uuid));
1270    }
1271}