akka/akka-core

Source.fromPublisher does not pick up attributes correctly

Open

#30.076 aberto em 3 de mar. de 2021

Ver no GitHub
 (12 comments) (0 reactions) (0 assignees)Scala (3.547 forks)batch import
1 - triagedhelp wantedt:stream

Métricas do repositório

Stars
 (13.277 stars)
Métricas de merge de PR
 (Mesclagem média 17h 35m) (11 fundiu PRs em 30d)

Description

Looks like Source.fromPublisher buffers some data from the source. I couldn't find any way to limit or disable the buffer size.

  val source3 = Source.fromPublisher {
    Source.fromIterator(() => Iterator.from(1))
      .takeWhile(_ < 50)
      .map { i =>
        println(s"internal akka: $i")
        i
      }
      .runWith(Sink.asPublisher(false))
  }.map { i =>
    Thread.sleep(2000)
    println(s"akka: $i")
    i
  }

log:

internal akka: 1
internal akka: 2
internal akka: 3
internal akka: 4
internal akka: 5
internal akka: 6
internal akka: 7
internal akka: 8
internal akka: 9
akka: 1
akka: 2
akka: 3
.
.
.
.

Originally I faced with this issue converting from Monix reactive streams to Akka-streams.

Guia do colaborador