Rust E0597 错误引发的一次思考
7 min read

Rust E0597 错误引发的一次思考

常见的架构设计会非常自然的按照业务逻辑、数据逻辑、控制逻辑等作为标准进行架构设计时内聚、解耦的评判,Rust 项目中可能还需要兼顾生命周期的解耦与内聚,例如 static 生命周期的模块、逻辑不太适合嵌套到 crate 或者子 mod 里面。
Rust E0597 错误引发的一次思考
Photo by David Pupaza / Unsplash

Rust E0597 错误案发现场

最近工作涉及的项目跟 Rust 编译器杠上了,花了一天才解决了一个编译报错。这件事值得对 Rust 生命周期加深一下理解,同时对 Rust 项目架构设计进行一次经验总结。

E0597 报错了

rust project architecture

如上图所示,是 Rust 项目的简化架构示意图(箭头表示调用关系),这个项目主要做的是在 k8s 场景下解决 bpf 管理的问题,上图所示的是一部分业务逻辑,在 watch 到 k8s operator 的 CRUD 之后触发 node host 上的 daemonset 的管理逻辑:
step1:watch CRUD event 之后通知 manager c;
step2:manager c 根据 event 事件介入 manager A 的管理逻辑,包括 object、resource 的管理;
step3:manager c 根据 event 事件初始化数据处理模块,从而根据 resource 中的数据进行审计、日志等数据输出;

为了能够做到 outputer 兼容各种 type,对于 outputer 模块初始化的入参抽象了一个 trait:

pub trait Handler {
    fn handler(&self, cpu: i32, data: &[u8]) -> i32;
    fn lost_handler(&self, cpu: i32, count: u64);
    fn event_type(&self) -> String;
}

初始化这个 outputer 的方式如下所示:

pub fn init_handler<'s>(name: String) -> Box<dyn Handler + Send + Sync + 's> {
    match type.as_str() {
        "typeA" => Box::new(crateA::DataHandler::new()),
        "typeB" => Box::new(crateB::DataHandler::new()),
        "typeC" => Box::new(crateC::DataHandler::new()),
        _ => Box::new(DefaultHandler::new(
            |cpu, count| {
                trace!("WARNING: default handler, count:{:?}, cpu:{}", count, cpu);
                cpu
            },
            None,
        )),
    }
}

问题来了:

// This manager A
lazy_static! {
    static ref OUTPUTER_STORE: OutputerStore = OutputerStore::new();
}

pub struct OutputerStore {
    store: Arc<Mutex<Vec<Outputer>>>,
}

impl OutputerStore {
	pub fn get(&self, id: String) -> Result<Outputer> {
		match OUTPUTER_STORE.store.lock() {
            Ok(v) => {
                for out in v.iter().map(|o| o.get_name().eq(&id_name)) {
                //
                // Rust compiling failed:
                // borrowed value does not live long enough, `o` dropped here while still borrowed
				// error code: E0597
				//
	                return Ok(*o)
				}
		}
		return Err(anyhow!("no {} found", id.clone()));
	}
}

// This manager B
pub fn install_object(crud: K8SCRUD) -> Result<()> {
	// 省略若干代码

	let event = Event::new(crud.hookpoint.clone());
    // managerA load and attach observer bpf program(ufunc)
    match ManagerA::get_manager_A()
        .create_uprobe_bpf_program_object(&event, crud)
    {
        Ok(()) => (),
        Err(e) => return Err(anyhow!("load bpf program error: {}", e)),
    }

    // create managerA resource
    ManagerA::get_manager_A()
        .init_observer_bpf_map(&crud.hookpoint.name, event.get_uuid().clone())?;

    // startup managerC outputer thread for listening
    let mut output_config = OutputConfig::new();
	let handler = init_handler(crud.hookpoint.name.clone());
	Parser::parse(&mut output_config, o.clone())?;
	OUTPUTER_STORE.get(crud.hookpoint.name.clone())?.startup()?;

	// 省略若干代码
}

// This is manager C
pub fn init_listener(&'static self) -> Result<()> {
    let listen_type = self.1.event_type().clone();
    match listen_type.as_str() {
        "perf" => {
            // use perf buffer
            let event_handler = move |cpu: i32, data: &[u8]| {
                let _ = self.1.handler(cpu, data);
            };
            let lost_handler = move |cpu: i32, count: u64| {
                self.1.lost_handler(cpu, count);
            };
            //let bpf_manager = BpfManager::get_bpf_manager();
            let perf_map = match BpfManager::get_bpf_manager()
                .get_bpf_maps_hashmap()?
                .get(&self.0.get_uuid())
            {
                Some(map) => {
                    let perf_map = map.get_libbpf_map_as_ref();
                    trace!("audit map ok, map type = {}", perf_map.map_type());
                    PerfBufferBuilder::new(perf_map)
                        .sample_cb(event_handler)
                        .lost_cb(lost_handler)
                        .build()?
                }
                None => return Err(anyhow!("no perf buffer map[{}] found", self.0.get_uuid())),
            };
            loop {
                // poll_wait here
                if let Err(e) = perf_map.poll(Duration::from_millis(500)) {
                    error!("Failed to poll perf buffer: {}", e);
                    return Err(anyhow!("poll perf buffer failed: {}", e));
                }
            }
        }
        "ring" => {
            // use ring buffer
            let mut ringbuf_builder = RingBufferBuilder::default();
            let event_handler = move |data: &[u8]| self.1.handler(-1, data);
            match BpfManager::get_bpf_manager()
                .get_bpf_maps_hashmap()?
                .get(&self.0.get_uuid())
            {
                Some(map) => {
                    let audit_map = map.get_libbpf_map_as_ref();
                    trace!("audit map ok, map type = {}", audit_map.map_type());
                    ringbuf_builder.add(audit_map, event_handler)?;
                }
                None => bail!("get ringbuf map failed"),
            }
            let ring_map = match ringbuf_builder.build() {
                Ok(ringbuf) => ringbuf,
                Err(e) => bail!("audit failed: {}", e),
            };
            loop {
                if let Err(e) = ring_map.poll(Duration::from_millis(500)) {
                    error!("Failed to poll perf buffer: {}", e);
                    return Err(anyhow!("poll ring buffer failed: {}", e));
                }
            }
            return Err(anyhow!("not support ringbuffer for now"));
        }
        _ => Ok(()),
    }
}

pub fn startup(&'static self) -> Result<()> {
	let thread_handler = thread::Builder::new()
	    .name(o.get_uuid())
	    .stack_size(4 * 1024 * 1024)
	    .spawn(move || {
	        trace!("create outputer{} thread", o.get_uuid());
	        // auditor thread should be blocked to  wait in poller,
	        // if the thread is not blocking, it should be panicked for system error
	        self.init_listener()
	            .expect("startup auditor poller failed");
	    });
	
	// 省略若干代码

	return Ok(());
}

问题分析

  • init_listener 之所以要是用 &'static self 是由 libbpf-rs 这个基础库决定的,ringbuffer add 是一个 static callback 也就是我们自定义的 Handler trait 需要是 static 的;
libbpf-rs bpf ringbuffer function definition
  • OUTPUTER_STORE 是一个全局队列,用于管理所有的 outputer 的生命周期;
  • 初步的原因是因为 get 出来的是 outputer 的引用,这个引用在 get 函数结束之后会被自动 dropp 掉,然而 startup 会启动数据监听的线程,直到 bpf object 结束即与 OUTPTER_STORE 中的生命周期保持一致,而 init_handler 实质需要 static handler,所以需要 get 到的 outputer 是 static,即's == 'static,这就是报错根本原因;

如何解决

尝试了很多方法:

  • 获取 OUTPUTER_STORE 中 outputer 元素的所有权;
  • Vec.into_vec 方法;
  • 为 outputer 增加生命周期注解's;
    上述几个方法均以失败告终。

经过一整天的尝试之后,不得已启用了针对 Rust 的终极手段:重构模块设计,这个问题进行抽象就是创建启动 thread 的生命周期管理的问题。outputer(包含 Handler trait)作为启动线程的实例由于生命周期是 static,将它集成到 crate 的某个 mod 中会很容易导致生命周期的冲突,所以把 outputer 作为一个 static 实例比较合适,那么作为 manager A 管理的全局的 resource 也就比较合理,于是就有了如下的重构:

rust project refactoring

思考

Rust 生命周期的理解

  1. crate/mod 的生命周期设计不是特别难以理解,项目中会出现生命周期问题往往都是历史的债务,例如项目依赖了一个没有很好生命周期设计的 Rust package;
  2. unsafe 对 Rust 的设计来说是潘多拉魔盒,尽量少使用;
  3. 生命周期问题出现的时候,如果实在没有办法解决了,可以尝试重构你的架构设计了;

Rust 项目架构设计思考

常见的架构设计会非常自然的按照业务逻辑、数据逻辑、控制逻辑等作为标准进行架构设计时内聚、解耦的评判,Rust 项目中可能还需要兼顾生命周期的解耦与内聚,例如 static 生命周期的模块、逻辑不太适合嵌套到 crate 或者子 mod 里面。

其实我一直有一个想法:总结一下各种好的设计模式在不同语言中的运用,其实所谓的设计模式就是针对某种编程语言的特点而总结最佳编程实践,可以帮助提升代码的可扩展性、可维护性,避开编程语言特性(不足)的坑。市面上总结比较多的设计模式大多是针对 C++、Java 的,其他语言很少有比较系统的总结,Go、Rust 这种比较新的语言其实更需要这样的东西。希望将来我能总结出类似的东西。

今天正好读到了一个关于设计模式的 thread 可以作为扩展阅读 👀,特别是 reply 部分有很多大牛的回复值得一读。

P.S. 如果各位读者有什么好的设计,欢迎给我指点迷津,Rust 的学习曲线还是太陡峭了,我可能连入门都算不上。

References

  1. rust - Nesting Structs: "borrowed value does not live long enough" - Stack Overflow
  2. rust - Argument requires that _ is borrowed for 'static - how do I work round this? - Stack Overflow
足迹