Move cmd coherence result partitioning to macro.
Because we're not cool enough to know what arcane type signature is needed to add an extension method to the stream itself.
This commit is contained in:
parent
c2551ac4ed
commit
d23f09295e
|
@ -1,4 +1,5 @@
|
||||||
use super::converter::validate_event_coherence;
|
use super::converter::validate_event_coherence;
|
||||||
|
use super::partition;
|
||||||
use crate::{
|
use crate::{
|
||||||
ai::logic::AiLogic,
|
ai::logic::AiLogic,
|
||||||
db::Database,
|
db::Database,
|
||||||
|
@ -38,20 +39,9 @@ impl CommandCoherence<'_> {
|
||||||
&self,
|
&self,
|
||||||
failures: Vec<EventCoherenceFailure>,
|
failures: Vec<EventCoherenceFailure>,
|
||||||
) -> ExecutionConversionResult {
|
) -> ExecutionConversionResult {
|
||||||
let (successes, failures): (Vec<_>, Vec<_>) = stream::iter(failures.into_iter())
|
let (successes, failures) = partition!(
|
||||||
.then(|failure| self.cohere_event(failure))
|
stream::iter(failures.into_iter()).then(|failure| self.cohere_event(failure))
|
||||||
.fold(
|
);
|
||||||
(vec![], vec![]),
|
|
||||||
|(mut successes, mut failures), res| async {
|
|
||||||
match res {
|
|
||||||
Ok(event) => successes.push(event),
|
|
||||||
Err(err) => failures.push(err),
|
|
||||||
};
|
|
||||||
|
|
||||||
(successes, failures)
|
|
||||||
},
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
// TODO we need to use LLM on events that have failed non-LLM coherence.
|
// TODO we need to use LLM on events that have failed non-LLM coherence.
|
||||||
|
|
||||||
|
@ -68,7 +58,7 @@ impl CommandCoherence<'_> {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn cohere_event(&self, failure: EventCoherenceFailure) -> CoherenceResult {
|
async fn cohere_event(&self, failure: EventCoherenceFailure) -> CoherenceResult {
|
||||||
let event = async {
|
let event_fix = async {
|
||||||
match failure {
|
match failure {
|
||||||
EventCoherenceFailure::TargetDoesNotExist(event) => {
|
EventCoherenceFailure::TargetDoesNotExist(event) => {
|
||||||
self.fix_target_does_not_exist(event).await
|
self.fix_target_does_not_exist(event).await
|
||||||
|
@ -77,7 +67,7 @@ impl CommandCoherence<'_> {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
event
|
event_fix
|
||||||
.and_then(|e| validate_event_coherence(&self.db, e))
|
.and_then(|e| validate_event_coherence(&self.db, e))
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
|
use super::coherence::strip_prefixes;
|
||||||
|
use super::partition;
|
||||||
use crate::{
|
use crate::{
|
||||||
db::Database,
|
db::Database,
|
||||||
models::commands::{
|
models::commands::{
|
||||||
|
@ -9,7 +11,6 @@ use anyhow::Result;
|
||||||
use futures::stream::{self, StreamExt, TryStreamExt};
|
use futures::stream::{self, StreamExt, TryStreamExt};
|
||||||
use itertools::{Either, Itertools};
|
use itertools::{Either, Itertools};
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
use super::coherence::strip_prefixes;
|
|
||||||
|
|
||||||
use strum::VariantNames;
|
use strum::VariantNames;
|
||||||
|
|
||||||
|
@ -85,15 +86,9 @@ pub async fn convert_raw_execution(
|
||||||
});
|
});
|
||||||
|
|
||||||
// Coherence validation of converted events.
|
// Coherence validation of converted events.
|
||||||
let (successes, incoherent_events): (Vec<_>, Vec<_>) = stream::iter(converted.into_iter())
|
let (successes, incoherent_events) = partition!(
|
||||||
.then(|event| validate_event_coherence(db, event))
|
stream::iter(converted.into_iter()).then(|event| validate_event_coherence(db, event))
|
||||||
.collect::<Vec<_>>()
|
);
|
||||||
.await
|
|
||||||
.into_iter()
|
|
||||||
.partition_map(|res| match res {
|
|
||||||
Ok(event) => Either::Left(event),
|
|
||||||
Err(err) => Either::Right(err),
|
|
||||||
});
|
|
||||||
|
|
||||||
let failure_len = conversion_failures.len() + incoherent_events.len();
|
let failure_len = conversion_failures.len() + incoherent_events.len();
|
||||||
|
|
||||||
|
|
|
@ -12,6 +12,27 @@ use crate::{
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
|
|
||||||
|
/// Splits up a stream of results into successes and failures.
|
||||||
|
macro_rules! partition {
|
||||||
|
($stream: expr) => {
|
||||||
|
$stream
|
||||||
|
.fold(
|
||||||
|
(vec![], vec![]),
|
||||||
|
|(mut successes, mut failures), res| async {
|
||||||
|
match res {
|
||||||
|
Ok(event) => successes.push(event),
|
||||||
|
Err(err) => failures.push(err),
|
||||||
|
};
|
||||||
|
|
||||||
|
(successes, failures)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(self) use partition;
|
||||||
|
|
||||||
pub mod builtins;
|
pub mod builtins;
|
||||||
pub mod coherence;
|
pub mod coherence;
|
||||||
pub mod converter;
|
pub mod converter;
|
||||||
|
|
Loading…
Reference in New Issue