在这种情况下如何正确循环通过缓冲通道?

问题描述:

I am trying to play around with go to make some kind of port scanner using the stdlib. This is more of an exercise than anything else, so please don't comment on the logic involved.

Looking at the following code:

package main

import (
    "flag"
    "fmt"
    "net"
    "time"
    "strings"
    "strconv"
    "log"
    "sync"
)

var commonPorts = map[int]string {
    21: "ftp",
    22: "sftp",
    80: "http",
    110: "pop3",
    143: "imap",
    443: "https",
    631: "ipp",
    993: "imaps",
    995: "pop3s",
}

type OP struct {
    mu sync.Mutex
    ports []string
}

func (o *OP) SafeAdd(port string) {
    o.mu.Lock()
    defer o.mu.Unlock()
    o.ports = append(o.ports, port)
}


func worker(host string, port int) string {
    address := fmt.Sprintf("%s:%d", host, port)

    conn, err := net.DialTimeout("tcp", address, time.Second * 3)
    if err != nil {
        return ""; // is offline, cannot connect
    }
    conn.Close()

    stringI := strconv.Itoa(port)
    if name, ok := commonPorts[port]; ok {
        stringI += fmt.Sprintf("(%s)", name)
    }

    return stringI
}

func processWithChannels(host string) <-chan string{
    openPort := make(chan string, 1000)
    var wg sync.WaitGroup
    for i := 1; i <= 65535; i++ {
        wg.Add(1)
        go func(openPort chan string, host string, i int) {
            defer wg.Done()
            port := worker(host, i)
            if port != "" {
                openPort <- port
            }
        }(openPort, host, i)
    }
    wg.Wait()
    close(openPort)
    return openPort
}

func main() {
    var host = flag.String("host", "127.0.0.1", "please insert the host")
    var pType = flag.Int("type", 2, "please insert the type")

    flag.Parse()
    fmt.Printf("Scanning: %s...
", *host)

    if _, err := net.LookupHost(*host); err != nil {
        log.Fatal(err)
    }

    openPorts := &OP{ports: []string{}};

    if *pType == 1 {

        ports := processWithChannels(*host);
        for port := range ports {
            openPorts.SafeAdd(port)
        }

    } else {

        var wg sync.WaitGroup
        for i := 1; i <= 65535; i++ {
            wg.Add(1)
            go func(o *OP, host string, i int){
                defer wg.Done()
                if port := worker(host, i); port != "" {
                    o.SafeAdd(port)
                }
            }(openPorts, *host, i)
        }
        wg.Wait()

    }

    if len(openPorts.ports) > 0 {
        fmt.Printf("Following ports are opened: %s
", strings.Join(openPorts.ports, ", "))
    } else {
        fmt.Printf("No open port on the host: %s!
", *host)
    }
}

There are two ways of starting a scan, either by using a buffered channel or by using sync.GroupWait and bail out once all the scans are done.

It seems to me that in this case, using sync.GroupWait makes more sense than using a buffered channel and loop through it till it's empty. However, using a buffered channel here, i don't see a way to detect that there's nothing else on the channel and that i should bail out from the for loop, except by using another sync.WaitGroup block.

I think my question is, in case i want to use the buffered channel solution only, how do i implement it properly so that i know when the processing is done so that i can proceed with the rest of the code? (don't suggest timeouts please).

Here's also a small benchmark with the two types, in case anyone interested:

MacBook-Pro:PortScanner c$ time ./PortScanner -host yahoo.com -type 1
Scanning: yahoo.com...
Following ports are opened: 80(http), 143(imap), 110(pop3), 995(pop3s), 993(imaps)

real    0m4.620s
user    0m1.193s
sys     0m1.284s
MacBook-Pro:PortScanner c$ time ./PortScanner -host yahoo.com -type 2
Scanning: yahoo.com...
Following ports are opened: 110(pop3), 80(http), 143(imap), 995(pop3s), 993(imaps)

real    0m4.055s
user    0m1.051s
sys     0m0.946s

The call to processWithChannels will hang if you need to put more than 1000 items into the channel. If you're going to use a buffered channel to hold all values until processing, there has to be enough capacity to accept all values.

If you are going to collect all values into a single slice, then there's no reason to use a channel, and your second solution is just fine.

If you want to "stream" the ports back as soon as possible, then you need something in between the two solutions

ports := make(chan string)

var wg sync.WaitGroup
for i := 1; i <= 65535; i++ {
    wg.Add(1)
    go func(i int) {
        defer wg.Done()
        if port := worker(*host, i); port != "" {
            ports <- port
        }
    }(i)
}

go func() {
    wg.Wait()
    close(ports)
}()

for port := range ports {
    fmt.Println("PORT:", port)
}

This however is likely to run into problems, like missing open ports when you dial all 65535 ports at the same time. Here is one possible pattern to use a pool of workers to dial concurrently:

ports := make(chan string)
toScan := make(chan int)
var wg sync.WaitGroup

// make 100 workers for dialing
for i := 0; i < 100; i++ {
    wg.Add(1)
    go func() {
        defer wg.Done()
        for p := range toScan {
            ports <- worker(*host, p)
        }
    }()
}

// close our receiving ports channel once all workers are done
go func() {
    wg.Wait()
    close(ports)
}()

// feed the ports to the worker pool
go func() {
    for i := 1; i <= 65535; i++ {
        toScan <- i
    }
    // signal the workers to stop
    close(toScan)
}()

for port := range ports {
    if port != "" {
        fmt.Println("PORT:", port)
    }
}