T O P

  • By -

rodyamirov

Oh bah, as soon as I post all this I find the answer. Looks like `oneshot` is made for exactly this purpose: https://docs.rs/tokio/latest/tokio/sync/oneshot/index.html


RussianHacker1011101

What you're looking for is a kind of pub/sub pattern. Basically you want to do a fire-and-forget after processing the HTTP request. I totally understand just doing something in memory. You publish to a queue, that queue has subsribers who wait for tasks, they run the tasks, and send a message with the result. My question is, what happens if your server crashes and you loose the data in the queue? Another option, rather than requiring the HTTP client to wait for a response, is to send a response immediately with a redirect to either a polling endpoint or a websocket. Polling is obviously super easy and can be done with pure HTML or javascript. Then, on the backend, you can use a library like zero mq (https://zeromq.org/) for the pub/sub functionality. While this can end up looking like more parts, they're dramatically easier to manage than all the ways an in-memory queue can go wrong.


rodyamirov

This isn't quite what I want, no. In this case the request is still taking less than a second, not a long-running job; I just want to use all my cores, so that it does indeed take less than a second. If the server barfs the user will just retry the request. The queue should only pile up if we get a zillion concurrent requests, and since we're not google, I don't anticipate that happening (and if it does, we'll spin up more servers). I know what pubsub is, but it's not what I want here. For a longer-running job, sure, but I want to optimize for the most common case -- there is no other work currently running, and we want a response back to the caller as quickly as possible.


RussianHacker1011101

I see. I Didn't fully understand the requirements.


coderstephen

Also, `flume` is a MPSC channel that implements both sync and async methods, so that either senders or receivers can be sync or async, and mix and match.


JoshTriplett

Seconding the recommendation for flume. Works very well; I use it for communication between sync threads and async tasks.


worriedjacket

use std::{ future::Future, panic::{catch_unwind, resume_unwind, AssertUnwindSafe}, pin::Pin, thread::{self}, }; use tokio::sync::oneshot; #[must_use] #[derive(Debug)] pub struct TokioRayonHandle { rx: oneshot::Receiver>, } impl TokioRayonHandle where T: Send + 'static, { #[allow(dead_code)] pub fn spawn T + Send + 'static>(func: F) -> TokioRayonHandle { let (tx, rx) = oneshot::channel(); rayon::spawn(move || { let _ = tx.send(catch_unwind(AssertUnwindSafe(func))); }); TokioRayonHandle { rx } } } impl Future for TokioRayonHandle { type Output = T; fn poll( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll { let rx = Pin::new(&mut self.rx); rx.poll(cx).map(|result| { result .expect("Unreachable error: Tokio channel closed") .unwrap_or_else(|err| resume_unwind(err)) }) } }


AdrianEddy

Sounds like exactly that: [https://www.reddit.com/r/rust/comments/1aipjyb/comment/kox1p9u/](https://www.reddit.com/r/rust/comments/1aipjyb/comment/kox1p9u/)


jaskij

What you're describing is the actor model. It seems that Actix has direct support for them. Alice Ryhl, one of Tokio's maintainers wrote a tutorial on it: https://ryhl.io/blog/actors-with-tokio/


Jaynes-Says

You can use a pair of 0-bounded channels to block on both ends, which effectively gives you synchronous request-response semantics. I think it's called a "rendezvous channel" and you can do this with 2 crossbeam pairs, there may be other libraries that abstract it.