twitter/finagle

Finagle redis client subscribe command fails in 18.6.0

Open

#704 创建于 2018年7月4日

在 GitHub 查看
 (8 评论) (0 反应) (0 负责人)Scala (8,864 star) (1,435 fork)batch import
help wanted

描述

We tried use subscribe command with auth in redis and got this error: NOAUTH Authentication required.

Expected behavior

Should work subscribe without errors in finagle-redis with auth

Actual behavior

My app can't make any subscription and we got this exception in log:

SEVERE: Exception propagated to the root monitor!
java.lang.IllegalArgumentException: Unexpected reply type: ErrorReply
	at com.twitter.finagle.redis.exp.SubscribeCommands$SubscriptionManager.onMessage(SubscribeClient.scala:242)
	at com.twitter.finagle.redis.exp.SubscribeDispatcher.$anonfun$loop$1(SubscribeDispatcher.scala:20)
	at com.twitter.finagle.redis.exp.SubscribeDispatcher.$anonfun$loop$1$adapted(SubscribeDispatcher.scala:18)
	at com.twitter.util.Promise$Monitored.apply(Promise.scala:202)
	at com.twitter.util.Promise$Monitored.apply(Promise.scala:198)
	at com.twitter.util.Promise$WaitQueue.com$twitter$util$Promise$WaitQueue$$run(Promise.scala:90)
	at com.twitter.util.Promise$WaitQueue$$anon$5.run(Promise.scala:85)
	at com.twitter.concurrent.LocalScheduler$Activation.run(Scheduler.scala:198)
	at com.twitter.concurrent.LocalScheduler$Activation.submit(Scheduler.scala:157)
	at com.twitter.concurrent.LocalScheduler.submit(Scheduler.scala:274)
	at com.twitter.concurrent.Scheduler$.submit(Scheduler.scala:109)
	at com.twitter.util.Promise$WaitQueue.runInScheduler(Promise.scala:85)
	at com.twitter.util.Promise$WaitQueue.runInScheduler$(Promise.scala:84)
	at com.twitter.util.Promise$Transformer.runInScheduler(Promise.scala:215)
	at com.twitter.util.Promise.updateIfEmpty(Promise.scala:739)
	at com.twitter.util.Promise.update(Promise.scala:711)
	at com.twitter.util.Promise.setValue(Promise.scala:687)
	at com.twitter.concurrent.AsyncQueue.offer(AsyncQueue.scala:122)
	at com.twitter.finagle.netty4.transport.ChannelTransport$$anon$2.channelRead(ChannelTransport.scala:188)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at com.twitter.finagle.netty4.codec.BufCodec$.channelRead(BufCodec.scala:65)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at com.twitter.finagle.util.BlockingTimeTrackingThreadFactory$$anon$1.run(BlockingTimeTrackingThreadFactory.scala:23)
	at java.lang.Thread.run(Thread.java:745)

Steps to reproduce the behavior

We use this environment: finagle-redis: 18.6.0 redis: 4.0.9

Test:

  import com.twitter.util.Await
  import com.twitter.finagle.Redis
  import com.twitter.io.Buf

  "Subscription with auth" should "work correctly" in {

    val r = Redis.client.newRichClient("localhost:6379")
    Await.result(r.auth(Buf.Utf8("mypass")))

    val subscriber = r.subscribe(Seq(Buf.Utf8("test"))) { case (_: Buf, message: Buf) =>
      println(Buf.Utf8.unapply(message).get)
    }

    Await.result(subscriber)
    Await.result(r.publish(Buf.Utf8("test"), Buf.Utf8("test")))
  }

贡献者指南