Akka Streams is a powerful library for building and processing streams of data using the actor-based Akka framework. It provides a Reactive Streams implementation, which is a specification for asynchronous, non-blocking, backpressure-enabled stream processing.
Akka Streams is designed to handle large or infinite streams of data efficiently, allowing developers to define and execute stream-based workflows in a modular and composable manner.
val source = Source(1 to 10) // Emits numbers from 1 to 10?
val source = Source(1 to 10) // Emits numbers from 1 to 10?
val flow = Flow[Int].map(_ * 2) // Multiplies each element by 2?
val runnableGraph = source.via(flow).to(sink)
runnableGraph.run()?
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Source, Sink, Flow}
import akka.stream.Materializer
implicit val system: ActorSystem = ActorSystem("AkkaStreamsExample")
implicit val materializer: Materializer = Materializer(system)
val source = Source(1 to 10) // Emit numbers from 1 to 10
val flow = Flow[Int].map(_ * 2) // Multiply each number by 2
val sink = Sink.foreach[Int](println) // Print each number
val runnableGraph = source.via(flow).to(sink)
runnableGraph.run() // Runs the stream?
2
4
6
8
10
12
14
16
18
20? Reactive Streams is a standard specification designed to provide asynchronous, non-blocking, and backpressure-enabled data stream processing. It defines a set of interfaces and rules for building and handling reactive data streams in a consistent manner, allowing for the interoperability of different implementations (like Akka Streams, Project Reactor, and RxJava).
Backpressure is the ability of a Subscriber to signal the Publisher how many data items it can handle at a time. This prevents the Publisher from overwhelming the Subscriber with more data than it can process.
request(5).import org.reactivestreams.*;
public class ReactiveStreamsExample {
public static void main(String[] args) {
Publisher<Integer> publisher = subscriber -> {
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
for (int i = 1; i <= n; i++) {
if (i > 10) {
subscriber.onComplete();
return;
}
subscriber.onNext(i);
}
}
@Override
public void cancel() {
System.out.println("Subscription canceled");
}
});
};
Subscriber<Integer> subscriber = new Subscriber<>() {
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("Subscribed");
subscription.request(5); // Request 5 items
}
@Override
public void onNext(Integer item) {
System.out.println("Received: " + item);
}
@Override
public void onError(Throwable throwable) {
System.err.println("Error: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Complete");
}
};
publisher.subscribe(subscriber);
}
}?
Subscribed
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Complete? Akka Streams:
Project Reactor:
RxJava:
Spring WebFlux:
Vert.x:
Akka Streams and Akka Actors are both parts of the Akka toolkit, but they serve different purposes and address different programming paradigms. Here’s a detailed comparison of the two:
Akka Actors:
Akka Streams:
class MyActor extends Actor {
def receive: Receive = {
case msg: String => println(s"Received message: $msg")
case _ => println("Unknown message")
}
}?
val source = Source(1 to 10)
val sink = Sink.foreach[Int](println)
val flow = source.to(sink)
flow.run()? RunnableGraph represents a reusable blueprint of a stream that can be materialized multiple times. The relationship between them is that a RunnableGraph requires a Materializer to execute its logic.SystemMaterializer.GraphDSL. They encapsulate the entire stream topology and can be run multiple times with different materializers or configurations. This reusability allows for testing, benchmarking, and modularization of complex stream setups.RunnableGraph, which in turn defines the structure and processing logic of the stream. val decider: Supervision.Decider = {
case _: ArithmeticException => Supervision.Resume
case _ => Supervision.Stop
}?
val decider: Supervision.Decider = {
case _: IllegalStateException => Supervision.Restart
case _ => Supervision.Stop
}?
val decider: Supervision.Decider = {
case _: IllegalArgumentException => Supervision.Stop
case _ => Supervision.Resume
}?
val flowWithSupervision = myFlow.withAttributes(ActorAttributes.supervisionStrategy(decider))? run() method is called on a RunnableGraph, which instantiates and connects all stages of the graph.val source = Source(1 to 10)
val flow = Flow[Int].map(_ * 2)
val sink = Sink.foreach(println)
val runnableGraph = source.via(flow).to(sink)
runnableGraph.run()?