为什么我的代码没有返回任何东西?斯卡拉 fs2
该程序允许将 Mapping Ints 推送到 Double 并识别队列中的退出时间.该程序未显示任何错误,但未打印任何内容.我错过了什么?
The program permits pushing Mapping Ints to Double and identifying the exit time from the queue. The program is not showing any error but It is not printing anything. What am I missing ?
import cats.effect.{ExitCode, IO, IOApp, Timer}
import fs2._
import fs2.concurrent.Queue
import scala.concurrent.duration._
import scala.util.Random
class Tst(q1: Queue[IO, (Double, IO[Long])])(implicit timer: Timer[IO]) {
val streamData = Stream.emit(1)
val scheduledStream = Stream.fixedDelay[IO](10.seconds) >> streamData
def storeInQueue: Stream[IO, Unit] = {
scheduledStream
.map { n =>
val entryTime =
timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
(n.toDouble, entryTime)
}
.through(q1.enqueue)
.evalTap(n => IO.delay(println(s"Pushing $n to Queue")))
q1.dequeue
.evalTap(_ => timer.sleep(Random.between(10, 30).seconds))
.map { n =>
val exitTime =
timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
(n._1, exitTime)
}
.evalMap(n => IO.delay(println(s"Pulling from queue $n")))
}
}
object Five2 extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val program = for {
q <- Queue.bounded[IO, (Double, IO[Long])](1)
b = new Tst(q)
_ <- b.storeInQueue.compile.drain
} yield ()
program.as(ExitCode.Success)
}
}
IO 是惰性求值的 - 要执行某事,它必须是创建最终 IO 值的表达式的一部分.
IO is evaluated lazily - for something to get executed it has to be a part of expression that created the final IO value.
这里:
def storeInQueue: Stream[IO, Unit] = {
scheduledStream ... // no side effects are run when we create this!
q1.dequeue ... // not using scheduledStream
}
value scheduledStream
根本没有使用,所以它不是从 storeInQueue
返回的值的一部分",所以当 IOApp
转IO 值进入计算,你的程序的配方不包含消息推送到队列的部分,所以队列总是空的.
value scheduledStream
is not used at all, so it isn't "a part" of value returned from storeInQueue
so when IOApp
turns IO value into computations, the recipe for your program doesn't contain the part where messages are pushed to queue, so the queue is always empty.
订阅队列的部分有效,但由于没有任何东西落在队列中,它一直在等待永远不会到达的项目.
The part which subscribes to queue works, but since nothing ever lands on queue it keeps on waiting for items that will never arrive.
您必须通过使它们成为一个 IO 值的一部分"来启动两个流,例如像这样:
You would have to start both streams by "making them part of one IO value", e.g. like this:
class Tst(q1: Queue[IO, (Double, IO[Long])])(implicit timer: Timer[IO]) {
val streamData = Stream.emit(1)
val scheduledStream = Stream.fixedDelay[IO](10.seconds) >> streamData
def storeInQueue =
scheduledStream
.map { n =>
val entryTime =
timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
(n.toDouble, entryTime)
}
.through(q1.enqueue)
.evalTap(n => IO.delay(println(s"Pushing $n to Queue")))
def takeFromQueue =
q1.dequeue
.evalTap(_ => timer.sleep(Random.between(10, 30).seconds))
.map { n =>
val exitTime =
timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
(n._1, exitTime)
}
.evalMap(n => IO.delay(println(s"Pulling from queue $n")))
}
}
object Five2 extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val program = for {
q <- Queue.bounded[IO, (Double, IO[Long])](1)
b = new Tst(q)
pushFiber <- b.storeInQueue.compile.drain.start // run as fiber
pullFiber <- b.takeFromQueue.compile.drain.start // run as fiber
} yield ()
program.as(ExitCode.Success)
}
}