I have the following Use Case.
A master Actor which maintains some global system state.
A number of TCP clients that connect to the system.
Each client can send data that modifies the global state and must receive
all the updates made by all clients, timers, etc.
The concept was to create a stream based TCP server using a Flow
constructed by wrapping an ActorSubscriber and ActorPublisher
Hello there,
I could not find answer to this particular question neither in docs nor in
this group:
Is it possible to wire Sink.actorSubscriber(...) into
Source.actorPublisher(...) in one (Partial-) Flow? And by 'wire' I mean to
somehow send message from ActorSubscriber, instantiated from given props,
to the 'same flow' instance of ActorPublisher?
I was thinking about giving some
Hi,
I would like to have an actor in my FlowGraph which could consume data from
a source, modify the data and after that it behave as a source and the push
back works properly.
Something like this:
in ~> actor ~> out
It there a possible way of doing this. I know that it can be a
ActorPublisher or a ActorSubscriber like this:
actor ~> out
or
in ~> actor
but I need my actor in the middle
Hi,
If ActorX needs to pass a load of data to be processed to a remote ActorY,
is this a correct think to do?
Imagine the data is rows of huge files that are downloaded on ActorX and to
be processed and indexed on ActorY.
trait IndexReq {
def source: Source[Row]
def flow: Flow[Row, Rec] // processing from a row to a record
}
trait IndexRes[T] {
def source: Source[T]
}
So
Hello All,
We are using Akka Streams to process 400,000 xml documents, run it through
series of transformations and then save it to a database. We are using
basic transformation and here is how our stream code looks,
Source(Set(allDocumentUris))
.map(uri => getDocumentFromNetwork(uri))
.map(doc => transformation1(doc))
.map(doc => saveToDatabase(doc))
.runWith
I took a page out of the stream-integrations documentation, and hooked a
simple ActorSubscriber up as a sink to a Flow. Something like:
class TestActorSubscriber extends ActorSubscriber {
override def postStop() = {
println("Stopped " + self.toString)
super.postStop
}
override def preStart() = {
println("Started " + self.toString)
super.preStart
}
override protected
I'm cutting my teeth on Akka streams, building the quintessential Twitter
streaming client. What I'm trying to do is some processing on the tweet
stream and then based on the result, divide the stream into "good" and
"bad" streams. Both streams are directed to the same ActorPublisher class
instantiated with different names. Following is my code so far but I'm at a
loss connecting all the
Hi,
I would like to do pretty much something like this:
http://doc.akka.io/docs/akka-stream-and-http-experimental/current/scala/stream-integrations.html#actorsubscriber
that there is a ActorSubscriber, which has some children... I would like to
control deyploment of the children via external configuraiton (one example
could be a pool of workers...).
akka.actor.deployment {
/?actor
Hi guys,
I'm trying to create an actor that would both subscribe to a stream and persist incoming events. Extending both ActorSubscriber and PersistentActor does not seem to work. Is there any other way I could achieve this? Two actors, one receiving, the other persisting is a solution but then I would need to manage the back-pressure between them manually. What is the general recommended pattern