绕过SSH服务器的端口转发限制

0x00 背景

在某些场景下SSH服务器会禁用掉端口转发的能力,以降低安全风险。这会导致很多依赖SSH端口转发的工具无法正常工作。

这里主要是修改了/etc/ssh/sshd_config文件中以下几项实现的:

#AllowAgentForwarding yes
#AllowTcpForwarding yes
#X11Forwarding yes

此时,SSH服务器基本就变成了只能执行shell命令的工具,无法用于建立通信通道。

是否有办法可以绕过这一限制呢?答案是肯定的。

0x01 借尸还魂

SSH最常用的能力就是交互式命令行,所谓交互式命令行,就是允许用户进行实时输入,并将输出实时展示出来。也就是说:交互式命令行本身就是一个双向通信的通道。因此,可以编写一个程序,它会在初始化时与指定的服务器端口建立Socket连接,然后将所有stdin读到的数据实时发送给Socket,并将Socket接收到的数据写到stdout中,stderr则用于输出控制信息和日志等。

根据上面的分析,这个程序其实跟telnet命令非常相似,但又不完全相同。因此用GO写了下面这个程序:

package main

import (
    "bufio"
    "fmt"
    "net"
    "os"
    "os/signal"
    "strconv"
    "sync"
    "time"
)

func telnet(host string, port int) int {
    conn, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", host, port), 10*time.Second)
    if err != nil {
        fmt.Fprintln(os.Stderr, fmt.Sprintf("[FAIL] Connect %s:%d failed: %s", host, port, err))
        return -1
    }
    defer conn.Close()
    fmt.Fprintln(os.Stderr, "[OKAY]")

    var wg sync.WaitGroup
    wg.Add(2)
    go handleWrite(conn, &wg)
    go handleRead(conn, &wg)
    wg.Wait()

    return 0
}

func handleRead(conn net.Conn, wg *sync.WaitGroup) int {
    defer wg.Add(-2)
    reader := bufio.NewReader(conn)
    buff := make([]byte, 4096)
    for {
        var bytes int
        var err error
        bytes, err = reader.Read(buff)
        if err != nil {
            fmt.Fprintln(os.Stderr, "Error to read from upstream because of", err)
            return -1
        }

        _, err = os.Stdout.Write(buff[:bytes])
        if err != nil {
            fmt.Fprintln(os.Stderr, "Error to write to stdout because of", err)
            return -1
        }
    }
}

func handleWrite(conn net.Conn, wg *sync.WaitGroup) int {
    defer wg.Add(-2)
    reader := bufio.NewReader(os.Stdin)
    buff := make([]byte, 4096)
    for {
        var bytes int
        var err error
        bytes, err = reader.Read(buff)
        if err != nil {
            fmt.Fprintln(os.Stderr, "Error to read from stdin because of", err)
            return -1
        }

        _, err = conn.Write(buff[:bytes])
        if err != nil {
            fmt.Fprintln(os.Stderr, "Error to write to upstream because of", err)
            return -1
        }
    }

}

func main() {
    if len(os.Args) < 3 {
        fmt.Fprintln(os.Stderr, "Usage: telnet host port")
        os.Exit(-1)
    }
    host := os.Args[1]
    port, _ := strconv.Atoi(os.Args[2])
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt)
    go func(){
        for sig := range c {
            // sig is a ^C, handle it
            fmt.Fprintln(os.Stderr, "Signal", sig)
            os.Exit(0)
        }
    }()
    os.Exit(telnet(host, port))
}

完整的代码可以参考:telnet-go

0x02 暗度陈仓

要使用telnet-go提供的通信通道,需要与ParamikoASyncSSH之类的SSH库进行集成才行。下面是使用ASyncSSH进行集成的核心逻辑:

class SSHProcessTunnel(SSHTunnel):
    """SSH Tunnel Over Process StdIn and StdOut"""

    def __init__(self, tunnel, url, address):
        super(SSHProcessTunnel, self).__init__(tunnel, url, address)
        self._process = None

    @classmethod
    def has_cache(cls, url):
        return False

    async def _log_stderr(self):
        while not self.closed():
            error_line = await self._process.stderr.readline()
            error_line = error_line.strip()
            utils.logger.warn(
                "[%s][stderr] %s" % (self.__class__.__name__, error_line.decode())
            )
            await asyncio.sleep(0.5)
        self._process = None

    async def connect(self):
        ssh_conn = await self.create_ssh_conn()
        if not ssh_conn:
            return False
        bin_path = self._url.path
        cmdline = "%s %s %d" % (bin_path, self._addr, self._port)
        self._process = await ssh_conn.create_process(cmdline, encoding=None)
        await asyncio.sleep(0.5)

        if self._process.exit_status is not None and self._process.exit_status != 0:
            utils.logger.error(
                "[%s] Create process %s failed: [%d]%s"
                % (
                    self.__class__.__name__,
                    cmdline,
                    self._process.exit_status,
                    await self._process.stderr.read(),
                )
            )
            return False
        status_line = await self._process.stderr.readline()
        if status_line.startswith(b"[OKAY]"):
            utils.safe_ensure_future(self._log_stderr())
            return True
        elif status_line.startswith(b"[FAIL]"):
            utils.logger.warn(
                "[%s] Connect %s:%d failed: %s"
                % (
                    self.__class__.__name__,
                    self._addr,
                    self._port,
                    status_line.decode(),
                )
            )
            return False
        else:
            raise RuntimeError("Unexpected stderr: %s" % status_line.decode())

    async def read(self):
        if self._process:
            buffer = await self._process.stdout.read(4096)
            if buffer:
                return buffer
        raise utils.TunnelClosedError()

    async def write(self, buffer):
        if self._process:
            return self._process.stdin.write(buffer)
        else:
            raise utils.TunnelClosedError()

    def closed(self):
        return self._process is None or self._process.exit_status is not None

    def close(self):
        if self._process:
            self._process.stdin.write(b"\x03")

完整的代码可以参考:turbo-tunnel

turbo-tunnel中可以使用以下方法将流量转发给SSH服务器:

turbo-tunnel -l http://:8080/ -t ssh+process://root:password@1.1.1.1:2222/usr/local/bin/telnet

/usr/local/bin/telnettelnet-go在服务器上的路径,需要设置好可执行权限。

然后,本地通过http://127.0.0.1:8080代理访问的流量都会转发到ssh服务器上,从而实现了通过ssh服务器进行端口转发的目的。

0x03 总结

利用进程的实时输入输出,可以解决SSH服务器不支持端口转发的问题,从而绕过服务器限制,建立通信通道。这种方式应用场景更广,也更加隐蔽,只是使用上需要提前将一个文件拷贝到SSH服务器上,这里可能少数场景会有些阻碍(例如删除了chmod命令),需要寻找绕过这些限制的方法。

不过总的来说,使用这种方法,大大提升了建立SSH隧道的成功率,具有较大的实际应用价值。

分享