如何使用libp2p处理与golang中的对等设备有关的缓冲读写流?

问题描述:

I am following this tutorial:

https://github.com/libp2p/go-libp2p-examples/tree/master/chat-with-mdns

In a short form, it:

  1. configures a p2p host
  2. sets a default handler function for incoming connections (3. not necessary)
  3. and opens a stream to the connecting peers:

stream, err := host.NewStream(ctx, peer.ID, protocol.ID(cfg.ProtocolID))

Afterwards, there is a buffer stream/read-write variable created:

rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))

Now this stream is used to send and receive data between the peers. This is done using two goroutine functions that have rw as an input:

go writeData(rw) go readData(rw)

My problems are:

  1. I want to send data to my peers and need feedback from them: e.g. in rw there is a question and they need to answer yes/no. How can I transfer back this answer and process it (enable some interaction)?

  2. The data I want to send in rw is not always the same. Sometimes it's a string containing only a name, sometimes it's a string containing a whole block etc. How can I distinguish?

I thought about those solutions. But I am new to golang, so maybe you have a better one:

  • do I need a new stream for every different content: stream, err := host.NewStream(ctx, peer.ID, protocol.ID(cfg.ProtocolID))

  • do I need to open more buffered rw varibales for every different content: rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))

  • are there any other solutions?

Thank you for any help to solve this!!

我正在学习本教程: p>

https://github.com/libp2p/go-libp2p-examples/tree/master / chat-with-mdns p>

简而言之,它是: p>

  1. 配置p2p主机 li>
  2. 为传入的连接设置默认处理函数 (3。不必要) li>
  3. 并向连接的对等方打开流: li> ol >

    stream,err:= host.NewStream(ctx,peer.ID,protocol.ID(cfg.ProtocolID)) code> p>

    之后,将创建一个缓冲区流/读写变量: p>

    rw:= bufio.NewReadWriter(bufio.NewReader(stream),bufio.NewWriter(stream)) p>

    现在,此流用于在同级之间发送和接收数据。 这可以通过使用两个具有rw作为输入的goroutine函数来完成: p>

    go writeData(rw) code> go readData(rw) code > p>

    我的问题是: p>

    1. 我想向同伴发送数据并需要他们的反馈:\ ne.g. 在rw中,有一个问题,他们需要回答是/否。 我该如何转回该答案并进行处理(启用一些交互功能)? p> li>

    2. 我要在rw中发送的数据并不总是相同。 有时是仅包含名称的字符串,有时是包含整个块的字符串等。如何区分? p> li> ol>

      我想到了那些解决方案。 但是我是golang的新手,所以也许您有一个更好的选择: p>

      • 我是否需要针对每个不同内容的新流: stream,err:= host.NewStream(ctx,peer.ID,protocol.ID(cfg.ProtocolID)) code> p> li>

      • 我需要打开更多吗 每个不同内容的缓冲rw变量: rw:= bufio.NewReadWriter(bufio.NewReader(stream),bufio.NewWriter(stream)) code> p> li>

      • 还有其他解决方案吗? p> li> ul>

        感谢您为解决此问题提供的帮助! p> div >

This is what readData does from your tuto:

func readData(rw *bufio.ReadWriter) {
    for {
        str, err := rw.ReadString('
')
        if err != nil {
            fmt.Println("Error reading from buffer")
            panic(err)
        }

        if str == "" {
            return
        }
        if str != "
" {
            // Green console colour:    \x1b[32m
            // Reset console colour:    \x1b[0m
            fmt.Printf("\x1b[32m%s\x1b[0m> ", str)
        }

    }
}

It basically reads the stream until it finds a , which is a new line character and prints it to stdout.

The writeData:

func writeData(rw *bufio.ReadWriter) {
    stdReader := bufio.NewReader(os.Stdin)

    for {
        fmt.Print("> ")
        sendData, err := stdReader.ReadString('
')
        if err != nil {
            fmt.Println("Error reading from stdin")
            panic(err)
        }

        _, err = rw.WriteString(fmt.Sprintf("%s
", sendData))
        if err != nil {
            fmt.Println("Error writing to buffer")
            panic(err)
        }
        err = rw.Flush()
        if err != nil {
            fmt.Println("Error flushing buffer")
            panic(err)
        }
    }
}

It reads data from stdin, so you can type messages, and writes this to the rw and flushes it. This kind of enables a sort of tty chat. If it works correctly you should be able to start at least two peers and communicate through stdin.

You shouldn't recreate new rw for new content. You can reuse the existing one until you close it. From the tuto's code, a new rw is created for each new peer.


Now a tcp stream does not work as an http request with a request and a response corresponding to that request. So if you want to send something, and get the response to that specific question, you could send a message of this format:

[8 bytes unique ID][content of the message]

And when you receive it, you parse it, prepare the response and send it with the same format, so that you can match messages, creating a sort of request/response communication.

You can do something like that:

func sendMsg(rw *bufio.ReadWriter, id int64, content []byte) error {
        // allocate our slice of bytes with the correct size 4 + size of the message + 1
        msg := make([]byte, 4 + len(content) + 1)

        // write id 
        binary.LittleEndian.PutUint64(msg, uint64(id))

        // add content to msg
        copy(msg[13:], content)

        // add new line at the end
        msg[len(msg)-1] = '
'

        // write msg to stream
        _, err = rw.Write(msg)
        if err != nil {
            fmt.Println("Error writing to buffer")
            return err
        }
        err = rw.Flush()
        if err != nil {
            fmt.Println("Error flushing buffer")
            return err
        }
        return nil
}

func readMsg(rw *bufio.ReadWriter) {
    for {
        // read bytes until new line
        msg, err := rw.ReadBytes('
')
        if err != nil {
            fmt.Println("Error reading from buffer")
            continue
        }

        // get the id
        id := int64(binary.LittleEndian.Uint64(msg[0:8]))

        // get the content, last index is len(msg)-1 to remove the new line char
        content := string(msg[8:len(msg)-1])

        if content != "" {
            // we print [message ID] content
            fmt.Printf("[%d] %s", id, content)
        }

        // here you could parse your message
        // and prepare a response
        response, err := prepareResponse(content)
        if err != nil {
            fmt.Println("Err while preparing response: ", err)
            continue
        }

        if err := s.sendMsg(rw, id, response); err != nil {
            fmt.Println("Err while sending response: ", err)
            continue
        }
    }
}

Hope this helps.