Skip to content

Commit

Permalink
feat: implement EngineApiRequestHandler::poll (#9670)
Browse files Browse the repository at this point in the history
Co-authored-by: Federico Gimenez <[email protected]>
  • Loading branch information
mattsse and fgimenez committed Jul 20, 2024
1 parent 1c13121 commit c8e6e37
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 11 deletions.
10 changes: 5 additions & 5 deletions crates/engine/tree/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ where
match this.handler.poll(cx) {
Poll::Ready(handler_event) => {
match handler_event {
HandlerEvent::BackfillSync(target) => {
// trigger backfill sync and start polling it
this.backfill_sync.on_action(BackfillAction::Start(target));
HandlerEvent::BackfillSync(action) => {
// forward action to backfill_sync
this.backfill_sync.on_action(action);
continue 'outer
}
HandlerEvent::Event(ev) => {
Expand Down Expand Up @@ -187,8 +187,8 @@ pub trait ChainHandler: Send + Sync {
/// Events/Requests that the [`ChainHandler`] can emit to the [`ChainOrchestrator`].
#[derive(Clone, Debug)]
pub enum HandlerEvent<T> {
/// Request to start a backfill sync
BackfillSync(PipelineTarget),
/// Request an action to backfill sync
BackfillSync(BackfillAction),
/// Other event emitted by the handler
Event(T),
}
Expand Down
22 changes: 19 additions & 3 deletions crates/engine/tree/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use reth_primitives::{SealedBlockWithSenders, B256};
use std::{
collections::HashSet,
sync::mpsc::Sender,
task::{Context, Poll},
task::{ready, Context, Poll},
};
use tokio::sync::mpsc::UnboundedReceiver;

Expand Down Expand Up @@ -177,7 +177,7 @@ impl<T> EngineRequestHandler for EngineApiRequestHandler<T>
where
T: EngineTypes,
{
type Event = EngineApiEvent;
type Event = BeaconConsensusEngineEvent;
type Request = BeaconEngineMessage<T>;

fn on_event(&mut self, event: FromEngine<Self::Request>) {
Expand All @@ -186,7 +186,23 @@ where
}

fn poll(&mut self, cx: &mut Context<'_>) -> Poll<RequestHandlerEvent<Self::Event>> {
todo!("poll tree")
let Some(ev) = ready!(self.from_tree.poll_recv(cx)) else { return Poll::Pending };
let ev = match ev {
EngineApiEvent::BeaconConsensus(ev) => {
RequestHandlerEvent::HandlerEvent(HandlerEvent::Event(ev))
}
EngineApiEvent::FromTree(ev) => match ev {
TreeEvent::BackfillAction(target) => {
RequestHandlerEvent::HandlerEvent(HandlerEvent::BackfillSync(target))
}
TreeEvent::Download(download) => RequestHandlerEvent::Download(download),
TreeEvent::TreeAction(ev) => {
// TODO revise this
return Poll::Pending
}
},
};
Poll::Ready(ev)
}
}

Expand Down
6 changes: 3 additions & 3 deletions crates/ethereum/engine/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use futures::{Stream, StreamExt};
use pin_project::pin_project;
use reth_beacon_consensus::{BeaconEngineMessage, EthBeaconConsensus};
use reth_beacon_consensus::{BeaconConsensusEngineEvent, BeaconEngineMessage, EthBeaconConsensus};
use reth_chainspec::ChainSpec;
use reth_db_api::database::Database;
use reth_engine_tree::{
backfill::PipelineSync,
chain::{ChainEvent, ChainOrchestrator},
download::BasicBlockDownloader,
engine::{EngineApiEvent, EngineApiRequestHandler, EngineHandler},
engine::{EngineApiRequestHandler, EngineHandler},
persistence::PersistenceHandle,
tree::EngineApiTreeHandlerImpl,
};
Expand Down Expand Up @@ -96,7 +96,7 @@ where
DB: Database + 'static,
Client: HeadersClient + BodiesClient + Clone + Unpin + 'static,
{
type Item = ChainEvent<EngineApiEvent>;
type Item = ChainEvent<BeaconConsensusEngineEvent>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut orchestrator = self.project().orchestrator;
Expand Down

0 comments on commit c8e6e37

Please sign in to comment.