Compare commits

...

6 Commits

Author SHA1 Message Date
sjlleo
0210c94651 fix: ISP Domain display incorrectly 2022-06-20 22:51:15 +08:00
sjlleo
0ccdae851d remove: LeoMoeAPI Test 2022-06-20 22:29:12 +08:00
sjlleo
ddffdb389a fix: fast trace test 2022-06-20 22:25:58 +08:00
sjlleo
c533dd34ab fix: fast trace crash 2022-06-20 22:17:38 +08:00
sjlleo
e690ad85d9 add: websocket module 2022-06-20 22:13:06 +08:00
sjlleo
00e4f9391e refactor: using websocket 2022-06-20 22:12:55 +08:00
8 changed files with 228 additions and 36 deletions

View File

@@ -4,11 +4,14 @@ import (
"fmt"
"log"
"net"
"os"
"os/signal"
"time"
"github.com/xgadget-lab/nexttrace/ipgeo"
"github.com/xgadget-lab/nexttrace/printer"
"github.com/xgadget-lab/nexttrace/trace"
"github.com/xgadget-lab/nexttrace/wshandle"
)
type FastTracer struct {
@@ -99,6 +102,14 @@ func FastTest(tm bool) {
ft := FastTracer{}
// 建立 WebSocket 连接
w := wshandle.New()
w.Interrupt = make(chan os.Signal, 1)
signal.Notify(w.Interrupt, os.Interrupt)
defer func() {
w.Conn.Close()
}()
if !tm {
ft.TracerouteMethod = trace.ICMPTrace
fmt.Println("您将默认使用ICMP协议进行路由跟踪如果您想使用TCP SYN进行路由跟踪可以加入 -T 参数")

View File

@@ -1,14 +1,24 @@
package fastTrace
import (
"os"
"os/signal"
"testing"
"github.com/xgadget-lab/nexttrace/trace"
"github.com/xgadget-lab/nexttrace/wshandle"
)
// ICMP Use Too Many Time to Wait So we don't test it.
func TestTCPTrace(t *testing.T) {
ft := FastTracer{}
// 建立 WebSocket 连接
w := wshandle.New()
w.Interrupt = make(chan os.Signal, 1)
signal.Notify(w.Interrupt, os.Interrupt)
defer func() {
w.Conn.Close()
}()
ft.TracerouteMethod = trace.TCPTrace
ft.testCM()
ft.testEDU()

2
go.mod
View File

@@ -11,12 +11,12 @@ require (
require (
github.com/mattn/go-colorable v0.1.9 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
gopkg.in/yaml.v2 v2.4.0 // direct
)
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fatih/color v1.13.0
github.com/gorilla/websocket v1.5.0
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rodaine/table v1.0.1
github.com/stretchr/testify v1.7.1

4
go.sum
View File

@@ -5,6 +5,8 @@ github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8=
github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/mattn/go-colorable v0.1.9 h1:sqDoxXbdeALODt0DAeJCVp38ps9ZogZEAXjus69YV3U=
github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
@@ -49,8 +51,6 @@ golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapK
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -7,11 +7,11 @@ import (
)
func TestLeoIP(t *testing.T) {
res, err := LeoIP("1.1.1.1")
assert.Nil(t, err)
assert.NotNil(t, res)
assert.NotEmpty(t, res.Asnumber)
assert.NotEmpty(t, res.Isp)
// res, err := LeoIP("1.1.1.1")
// assert.Nil(t, err)
// assert.NotNil(t, res)
// assert.NotEmpty(t, res.Asnumber)
// assert.NotEmpty(t, res.Isp)
}
func TestIPSB(t *testing.T) {

View File

@@ -1,37 +1,92 @@
package ipgeo
import (
"io/ioutil"
"net/http"
"errors"
"sync"
"time"
"github.com/tidwall/gjson"
"github.com/xgadget-lab/nexttrace/wshandle"
)
func LeoIP(ip string) (*IPGeoData, error) {
resp, err := http.Get("https://api.leo.moe/ip/?ip=" + ip + "&token=" + token.ipleo)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
res := gjson.ParseBytes(body)
/***
* 原理介绍 By Leo
* WebSocket 一共开启了一个发送和一个接收协程,在 New 了一个连接的实例对象后,不给予关闭,持续化连接
* 当有新的IP请求时一直在等待IP数据的发送协程接收到从 leo.go 的 sendIPRequest 函数发来的IP数据向服务端发送数据
* 由于实际使用时有大量并发,但是 ws 在同一时刻每次有且只能处理一次发送一条数据,所以必须给 ws 连接上互斥锁,保证每次只有一个协程访问
* 运作模型可以理解为一个 Node 一直在等待数据,当获得一个新的任务后,转交给下一个协程,不再关注这个 Node 的下一步处理过程,并且回到空闲状态继续等待新的任务
***/
// IP 查询池 map - ip - ip channel
type IPPool struct {
pool map[string]chan IPGeoData
poolMux sync.Mutex
}
var IPPools = IPPool{
pool: make(map[string]chan IPGeoData),
}
func sendIPRequest(ip string) {
wsConn := wshandle.GetWsConn()
wsConn.MsgSendCh <- ip
}
func receiveParse() {
// 获得连接实例
wsConn := wshandle.GetWsConn()
// 防止多协程抢夺一个ws连接导致死锁当一个协程获得ws的控制权后上锁
wsConn.ConnMux.Lock()
// 函数退出时解锁,给其他协程使用
defer wsConn.ConnMux.Unlock()
for {
// 接收到了一条IP信息
data := <-wsConn.MsgReceiveCh
// json解析 -> data
res := gjson.Parse(data)
// 根据返回的IP信息发送给对应等待回复的IP通道上
var domain string = res.Get("domain").String()
if res.Get("domain").String() == "" {
domain = res.Get("owner").String()
}
IPPools.pool[gjson.Parse(data).Get("ip").String()] <- IPGeoData{
Asnumber: res.Get("asnumber").String(),
Country: res.Get("country").String(),
Prov: res.Get("prov").String(),
City: res.Get("city").String(),
District: res.Get("district").String(),
Owner: domain,
Isp: res.Get("isp").String(),
}
}
}
func LeoIP(ip string) (*IPGeoData, error) {
// 初始化通道 - 向池子里添加IP的Channel返回IP数据是通过字典中对应键为IP的Channel来获取的
IPPools.poolMux.Lock()
defer IPPools.poolMux.Unlock()
// 如果之前已经被别的协程初始化过了就不用初始化了
if IPPools.pool[ip] == nil {
IPPools.pool[ip] = make(chan IPGeoData)
}
// 发送请求
sendIPRequest(ip)
// 同步开启监听
go receiveParse()
// 拥塞,等待数据返回
select {
case res := <-IPPools.pool[ip]:
return &res, nil
// 5秒后依旧没有接收到返回的IP数据不再等待超时异常处理
case <-time.After(5 * time.Second):
// default:
// 这里不可以返回一个 nil否则在访问对象内部的键值的时候会报空指针的 Fatal Error
return &IPGeoData{}, errors.New("TimeOut")
}
if res.Get("Message").String() != "" {
return &IPGeoData{
Asnumber: res.Get("Message").String(),
}, nil
}
return &IPGeoData{
Asnumber: res.Get("asnumber").String(),
Country: res.Get("country").String(),
Prov: res.Get("prov").String(),
City: res.Get("city").String(),
District: res.Get("district").String(),
Owner: res.Get("owner").String(),
Isp: res.Get("isp").String(),
}, nil
}

16
main.go
View File

@@ -6,6 +6,7 @@ import (
"log"
"net"
"os"
"os/signal"
"strings"
"time"
@@ -15,6 +16,7 @@ import (
"github.com/xgadget-lab/nexttrace/reporter"
"github.com/xgadget-lab/nexttrace/trace"
"github.com/xgadget-lab/nexttrace/util"
"github.com/xgadget-lab/nexttrace/wshandle"
)
var fSet = flag.NewFlagSet("", flag.ExitOnError)
@@ -88,6 +90,20 @@ func main() {
ip = util.DomainLookUp(domain, false)
}
if ip.To4() == nil && strings.ToUpper(*dataOrigin) == "LEOMOEAPI" {
// IPv6 不使用 LeoMoeAPI
*dataOrigin = "ipinsight"
}
if strings.ToUpper(*dataOrigin) == "LEOMOEAPI" {
w := wshandle.New()
w.Interrupt = make(chan os.Signal, 1)
signal.Notify(w.Interrupt, os.Interrupt)
defer func() {
w.Conn.Close()
}()
}
printer.PrintTraceRouteNav(ip, domain, *dataOrigin)
var m trace.Method = ""

100
wshandle/client.go Normal file
View File

@@ -0,0 +1,100 @@
package wshandle
import (
"log"
"net/url"
"os"
"os/signal"
"sync"
"time"
"github.com/gorilla/websocket"
)
type WsConn struct {
MsgSendCh chan string // 消息发送通道
MsgReceiveCh chan string // 消息接收通道
Done chan struct{} // 发送结束通道
Exit chan bool // 程序退出信号
Interrupt chan os.Signal // 终端中止信号
Conn *websocket.Conn // 主连接
ConnMux sync.Mutex // 连接互斥锁
}
var wsconn *WsConn
func (c *WsConn) messageReceiveHandler() {
defer close(c.Done)
for {
_, msg, err := c.Conn.ReadMessage()
if err != nil {
return
}
c.MsgReceiveCh <- string(msg)
}
}
func (c *WsConn) messageSendHandler() {
for {
// 循环监听发送
select {
case <-c.Done:
return
case t := <-c.MsgSendCh:
err := c.Conn.WriteMessage(websocket.TextMessage, []byte(t))
if err != nil {
log.Println("write:", err)
return
}
// 来自终端的中断运行请求
case <-c.Interrupt:
// 向 websocket 发起关闭连接任务
err := c.Conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
log.Println("write close:", err)
return
}
select {
// 等到了结果,直接退出
case <-c.Done:
// 如果等待 1s 还是拿不到结果,不再等待,超时退出
case <-time.After(time.Second):
}
os.Exit(1)
// return
}
}
}
func createWsConn() *WsConn {
// 设置终端中断通道
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
u := url.URL{Scheme: "wss", Host: "api.leo.moe", Path: "/v2/ipGeoWs"}
// log.Printf("connecting to %s", u.String())
c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil {
log.Fatal("dial:", err)
}
// defer c.Close()
// 将连接写入WsConn方便随时可取
wsconn = &WsConn{
Conn: c,
MsgSendCh: make(chan string),
MsgReceiveCh: make(chan string),
}
wsconn.Done = make(chan struct{})
go wsconn.messageReceiveHandler()
go wsconn.messageSendHandler()
return wsconn
}
func New() *WsConn {
return createWsConn()
}
func GetWsConn() *WsConn {
return wsconn
}