akka/akka-http

Investigate potential connection leak in HTTP/1.1 when HTTP/2 is enabled

Open

#3,963 建立於 2021年12月6日

在 GitHub 查看
 (3 留言) (0 反應) (0 負責人)Scala (1,311 star) (598 fork)batch import
1 - triagedhelp wantedt:coret:http2technical-debt

描述

Found by executing ClientServerSpec when "complete a request/response when request has Connection: close set" fails with leaking stages:

activeShells (actor: akka://ClientServerSpecBase/system/StreamSupervisor-0/flow-11-0-unnamed)
  GraphInterpreterShell(
  logics: [
    ServerImpl.netIn attrs: [Name(SubSink%28ServerImpl.netIn%29), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    ServerImpl.netOut attrs: [Name(SubSource%28ServerImpl.netOut%29), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    MapAsyncUnordered(1,akka.http.impl.engine.http2.Http2Ext$$Lambda$51006/764242924@61556c92) attrs: [Name(mapAsyncUnordered), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    One2OneBidi attrs: [Name(One2OneBidi), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    akka.http.impl.engine.server.HttpServerBluePrint$RequestTimeoutSupport@656ae460 attrs: [Name(RequestTimeoutSupport), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    Identity attrs: [Name(identityOp), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    akka.http.impl.engine.server.HttpServerBluePrint$PrepareRequests@1e312320 attrs: [CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    akka.http.impl.engine.server.HttpServerBluePrint$ControllerStage@4bbad175 attrs: [Name(ControllerStage), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    akka.http.impl.engine.rendering.HttpResponseRendererFactory$HttpResponseRenderer$@44493062 attrs: [Name(renderer), Name(renderer), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)]
    Map(akka.http.impl.engine.server.HttpServerBluePrint$$$Lambda$51025/1768981470@24faa018) attrs: [Name(map), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    HttpRequestParser attrs: [Name(HttpRequestParser), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    akka.http.impl.engine.server.HttpServerBluePrint$ProtocolSwitchStage@5f757a79 attrs: [Name(ProtocolSwitchStage), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    Map(SendBytes) attrs: [Name(map), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    Collect attrs: [Name(collect), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    Log attrs: [LogLevels(LogLevel(4),LogLevel(4),LogLevel(4)), LogLevels(LogLevel(4),LogLevel(4),LogLevel(4)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    Log attrs: [LogLevels(LogLevel(4),LogLevel(4),LogLevel(4)), LogLevels(LogLevel(4),LogLevel(4),LogLevel(4)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    Identity attrs: [Name(identityOp), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    akka.http.impl.util.StreamUtils$DelayCancellationStage@5ddfe509 attrs: [CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)]
  ],
  connections: [
    Connection(0, ServerImpl.netIn, Identity, Closed)
    Connection(1, MapAsyncUnordered(1,akka.http.impl.engine.http2.Http2Ext$$Lambda$51006/764242924@61556c92), One2OneBidi, Closed)
    Connection(2, One2OneBidi, akka.http.impl.engine.server.HttpServerBluePrint$RequestTimeoutSupport@656ae460, Closed)
    Connection(3, One2OneBidi, MapAsyncUnordered(1,akka.http.impl.engine.http2.Http2Ext$$Lambda$51006/764242924@61556c92), Closed)
    Connection(4, akka.http.impl.engine.server.HttpServerBluePrint$RequestTimeoutSupport@656ae460, akka.http.impl.engine.server.HttpServerBluePrint$PrepareRequests@1e312320, Closed)
    Connection(5, akka.http.impl.engine.server.HttpServerBluePrint$RequestTimeoutSupport@656ae460, One2OneBidi, Closed)
    Connection(6, Identity, akka.http.impl.engine.server.HttpServerBluePrint$RequestTimeoutSupport@656ae460, Closed)
    Connection(7, akka.http.impl.engine.server.HttpServerBluePrint$PrepareRequests@1e312320, akka.http.impl.engine.server.HttpServerBluePrint$ControllerStage@4bbad175, Closed)
    Connection(8, akka.http.impl.engine.server.HttpServerBluePrint$ControllerStage@4bbad175, Map(akka.http.impl.engine.server.HttpServerBluePrint$$$Lambda$51025/1768981470@24faa018), Closed)
    Connection(9, akka.http.impl.engine.server.HttpServerBluePrint$ControllerStage@4bbad175, Identity, Closed)
    Connection(10, akka.http.impl.engine.rendering.HttpResponseRendererFactory$HttpResponseRenderer$@44493062, akka.http.impl.engine.server.HttpServerBluePrint$ControllerStage@4bbad175, Closed)
    Connection(11, Map(akka.http.impl.engine.server.HttpServerBluePrint$$$Lambda$51025/1768981470@24faa018), HttpRequestParser, Closed)
    Connection(12, HttpRequestParser, akka.http.impl.engine.server.HttpServerBluePrint$ProtocolSwitchStage@5f757a79, Closed)
    Connection(13, akka.http.impl.engine.server.HttpServerBluePrint$ProtocolSwitchStage@5f757a79, akka.http.impl.engine.rendering.HttpResponseRendererFactory$HttpResponseRenderer$@44493062, Closed)
    Connection(14, akka.http.impl.engine.server.HttpServerBluePrint$ProtocolSwitchStage@5f757a79, Collect, Closed)
    Connection(15, Map(SendBytes), akka.http.impl.engine.server.HttpServerBluePrint$ProtocolSwitchStage@5f757a79, Closed)
    Connection(16, Collect, Log, Closed)
    Connection(17, Log, Map(SendBytes), Closed)
    Connection(18, Log, akka.http.impl.util.StreamUtils$DelayCancellationStage@5ddfe509, Closed)
    Connection(19, Identity, Log, Closed)
    Connection(20, akka.http.impl.util.StreamUtils$DelayCancellationStage@5ddfe509, ServerImpl.netOut, ShouldPush
  ]
)

dot format graph for deadlock analysis:
================================================================
digraph waits {
  N0 [label="ServerImpl.netIn"];
  N1 [label="ServerImpl.netOut"];
  N2 [label="MapAsyncUnordered(1,akka.http.impl.engine.http2.Http2Ext$$Lambda$51006/764242924@61556c92)"];
  N3 [label="One2OneBidi"];
  N4 [label="akka.http.impl.engine.server.HttpServerBluePrint$RequestTimeoutSupport@656ae460"];
  N5 [label="Identity"];
  N6 [label="akka.http.impl.engine.server.HttpServerBluePrint$PrepareRequests@1e312320"];
  N7 [label="akka.http.impl.engine.server.HttpServerBluePrint$ControllerStage@4bbad175"];
  N8 [label="akka.http.impl.engine.rendering.HttpResponseRendererFactory$HttpResponseRenderer$@44493062"];
  N9 [label="Map(akka.http.impl.engine.server.HttpServerBluePrint$$$Lambda$51025/1768981470@24faa018)"];
  N10 [label="HttpRequestParser"];
  N11 [label="akka.http.impl.engine.server.HttpServerBluePrint$ProtocolSwitchStage@5f757a79"]
  N12 [label="Map(SendBytes)"];
  N13 [label="Collect"];
  N14 [label="Log"];
  N15 [label="Log"];
  N16 [label="Identity"];
  N17 [label="akka.http.impl.util.StreamUtils$DelayCancellationStage@5ddfe509"];
  N0 -> N16 [style=dotted, label=closed, dir=both];
  N2 -> N3 [style=dotted, label=closed, dir=both];
  N3 -> N4 [style=dotted, label=closed, dir=both];
  N3 -> N2 [style=dotted, label=closed, dir=both];
  N4 -> N6 [style=dotted, label=closed, dir=both];
  N4 -> N3 [style=dotted, label=closed, dir=both];
  N5 -> N4 [style=dotted, label=closed, dir=both];
  N6 -> N7 [style=dotted, label=closed, dir=both];
  N7 -> N9 [style=dotted, label=closed, dir=both];
  N7 -> N5 [style=dotted, label=closed, dir=both];
  N8 -> N7 [style=dotted, label=closed, dir=both];
  N9 -> N10 [style=dotted, label=closed, dir=both];
  N10 -> N11 [style=dotted, label=closed, dir=both];
  N11 -> N8 [style=dotted, label=closed, dir=both];
  N11 -> N13 [style=dotted, label=closed, dir=both];
  N12 -> N11 [style=dotted, label=closed, dir=both];
  N13 -> N15 [style=dotted, label=closed, dir=both];
  N14 -> N12 [style=dotted, label=closed, dir=both];
  N15 -> N17 [style=dotted, label=closed, dir=both];
  N16 -> N14 [style=dotted, label=closed, dir=both];
  N17 -> N1 [label=shouldPush, color=red];
}
================================================================

貢獻者指南