From 966ca73b9be74b380d3d0ff171190ad1e0adc151 Mon Sep 17 00:00:00 2001 From: Evan Morrison Date: Mon, 27 Apr 2026 01:40:50 +0000 Subject: [PATCH] Initial commit: AIS vessel position tracking microservice Go service that connects to aisstream.io websocket, filtered by MMSIs from track_devices table. Writes positions to aisstream_pos (24h TTL) and upserts latest position per vessel to aisstream_last. Reloads MMSI list every 5 minutes to pick up changes. Co-Authored-By: Claude Opus 4.6 --- .gitignore | 1 + Dockerfile | 11 ++ docker-compose.yml | 18 +++ go.mod | 8 + go.sum | 4 + main.go | 360 +++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 402 insertions(+) create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 docker-compose.yml create mode 100644 go.mod create mode 100644 go.sum create mode 100644 main.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4c49bd7 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.env diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..b1cb53f --- /dev/null +++ b/Dockerfile @@ -0,0 +1,11 @@ +FROM golang:1.22-alpine AS builder +WORKDIR /app +COPY go.mod go.sum ./ +RUN go mod download +COPY main.go . +RUN CGO_ENABLED=0 go build -o aisstream . + +FROM alpine:3.20 +RUN apk add --no-cache ca-certificates tzdata +COPY --from=builder /app/aisstream /usr/local/bin/aisstream +CMD ["aisstream"] diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..75a0999 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,18 @@ +services: + aisstream: + build: . + container_name: aisstream + environment: + POSTGRES_HOST: ${POSTGRES_HOST} + POSTGRES_DB: ${POSTGRES_DB} + POSTGRES_USER: ${POSTGRES_USER} + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} + POSTGRES_PORT: ${POSTGRES_PORT} + AISSTREAM_API_KEY: ${AISSTREAM_API_KEY} + networks: + - postgres_network + restart: unless-stopped + +networks: + postgres_network: + external: true diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..e538cca --- /dev/null +++ b/go.mod @@ -0,0 +1,8 @@ +module aisstream + +go 1.22 + +require ( + github.com/gorilla/websocket v1.5.3 + github.com/lib/pq v1.10.9 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..c040992 --- /dev/null +++ b/go.sum @@ -0,0 +1,4 @@ +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= diff --git a/main.go b/main.go new file mode 100644 index 0000000..191a230 --- /dev/null +++ b/main.go @@ -0,0 +1,360 @@ +package main + +import ( + "database/sql" + "encoding/json" + "fmt" + "log" + "os" + "os/signal" + "slices" + "sort" + "syscall" + "time" + + "github.com/gorilla/websocket" + _ "github.com/lib/pq" +) + +type SubscriptionMessage struct { + APIKey string `json:"APIKey"` + MessageType []int `json:"MessageType"` + FiltersShipMMSI []string `json:"FiltersShipMMSI"` +} + +type Position struct { + Type string `json:"type"` + Coordinates []float64 `json:"coordinates"` +} + +type Geometry struct { + Type string `json:"type"` + Coordinates []float64 `json:"coordinates"` +} + +type MetaData struct { + MMSI int `json:"MMSI"` + ShipName string `json:"ShipName"` + Latitude float64 `json:"latitude"` + Longitude float64 `json:"longitude"` + TimeUtc string `json:"time_utc"` +} + +type PositionReport struct { + Cog float64 `json:"Cog"` + NavigationalStatus int `json:"NavigationalStatus"` + RateOfTurn float64 `json:"RateOfTurn"` + Sog float64 `json:"Sog"` + TrueHeading int `json:"TrueHeading"` + UserID int `json:"UserID"` + Latitude float64 `json:"Latitude"` + Longitude float64 `json:"Longitude"` +} + +type Message struct { + PositionReport *PositionReport `json:"PositionReport"` +} + +type AisStreamMessage struct { + MessageType string `json:"MessageType"` + MetaData MetaData `json:"MetaData"` + Message Message `json:"Message"` +} + +func getEnv(key string) string { + val := os.Getenv(key) + if val == "" { + log.Fatalf("required env var %s not set", key) + } + return val +} + +func connectDB() *sql.DB { + dsn := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=disable", + getEnv("POSTGRES_HOST"), + getEnv("POSTGRES_PORT"), + getEnv("POSTGRES_USER"), + getEnv("POSTGRES_PASSWORD"), + getEnv("POSTGRES_DB"), + ) + db, err := sql.Open("postgres", dsn) + if err != nil { + log.Fatalf("failed to open db: %v", err) + } + for i := 0; i < 30; i++ { + if err := db.Ping(); err == nil { + log.Println("connected to postgres") + return db + } + log.Printf("waiting for postgres (attempt %d/30)...", i+1) + time.Sleep(2 * time.Second) + } + log.Fatal("could not connect to postgres after 30 attempts") + return nil +} + +func createTables(db *sql.DB) { + _, err := db.Exec(` + CREATE TABLE IF NOT EXISTS aisstream_pos ( + id BIGSERIAL PRIMARY KEY, + mmsi INTEGER NOT NULL, + ship_name VARCHAR, + latitude DOUBLE PRECISION NOT NULL, + longitude DOUBLE PRECISION NOT NULL, + cog DOUBLE PRECISION, + sog DOUBLE PRECISION, + heading INTEGER, + nav_status INTEGER, + received_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ); + CREATE INDEX IF NOT EXISTS idx_aisstream_pos_received_at ON aisstream_pos (received_at); + CREATE INDEX IF NOT EXISTS idx_aisstream_pos_mmsi ON aisstream_pos (mmsi); + + CREATE TABLE IF NOT EXISTS aisstream_last ( + mmsi INTEGER PRIMARY KEY, + ship_name VARCHAR, + latitude DOUBLE PRECISION NOT NULL, + longitude DOUBLE PRECISION NOT NULL, + cog DOUBLE PRECISION, + sog DOUBLE PRECISION, + heading INTEGER, + nav_status INTEGER, + received_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ); + `) + if err != nil { + log.Fatalf("failed to create tables: %v", err) + } + log.Println("tables ready") +} + +func loadMMSIs(db *sql.DB) []string { + rows, err := db.Query("SELECT feed_id FROM track_devices WHERE device_type = 'AIS' AND feed_id IS NOT NULL") + if err != nil { + log.Fatalf("failed to query track_devices: %v", err) + } + defer rows.Close() + + var mmsis []string + for rows.Next() { + var mmsi string + if err := rows.Scan(&mmsi); err != nil { + log.Fatalf("failed to scan mmsi: %v", err) + } + mmsis = append(mmsis, mmsi) + } + log.Printf("loaded %d AIS MMSIs", len(mmsis)) + return mmsis +} + +func purgeOldPositions(db *sql.DB) { + ticker := time.NewTicker(10 * time.Minute) + defer ticker.Stop() + for range ticker.C { + res, err := db.Exec("DELETE FROM aisstream_pos WHERE received_at < NOW() - INTERVAL '24 hours'") + if err != nil { + log.Printf("purge error: %v", err) + continue + } + n, _ := res.RowsAffected() + if n > 0 { + log.Printf("purged %d old positions", n) + } + } +} + +func insertPosition(db *sql.DB, mmsi int, shipName string, lat, lon, cog, sog float64, heading, navStatus int) { + _, err := db.Exec(` + INSERT INTO aisstream_pos (mmsi, ship_name, latitude, longitude, cog, sog, heading, nav_status) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, + mmsi, shipName, lat, lon, cog, sog, heading, navStatus, + ) + if err != nil { + log.Printf("insert pos error: %v", err) + } +} + +func upsertLast(db *sql.DB, mmsi int, shipName string, lat, lon, cog, sog float64, heading, navStatus int) { + _, err := db.Exec(` + INSERT INTO aisstream_last (mmsi, ship_name, latitude, longitude, cog, sog, heading, nav_status, received_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, NOW()) + ON CONFLICT (mmsi) DO UPDATE SET + ship_name = EXCLUDED.ship_name, + latitude = EXCLUDED.latitude, + longitude = EXCLUDED.longitude, + cog = EXCLUDED.cog, + sog = EXCLUDED.sog, + heading = EXCLUDED.heading, + nav_status = EXCLUDED.nav_status, + received_at = EXCLUDED.received_at`, + mmsi, shipName, lat, lon, cog, sog, heading, navStatus, + ) + if err != nil { + log.Printf("upsert last error: %v", err) + } +} + +func connectWebSocket(apiKey string, mmsis []string) *websocket.Conn { + for { + ws, _, err := websocket.DefaultDialer.Dial("wss://stream.aisstream.io/v0/stream", nil) + if err != nil { + log.Printf("websocket dial error: %v, retrying in 10s...", err) + time.Sleep(10 * time.Second) + continue + } + + // MessageType 1,2,3 are position reports + sub := SubscriptionMessage{ + APIKey: apiKey, + MessageType: []int{1, 2, 3}, + FiltersShipMMSI: mmsis, + } + msg, _ := json.Marshal(sub) + if err := ws.WriteMessage(websocket.TextMessage, msg); err != nil { + log.Printf("subscribe error: %v, retrying in 10s...", err) + ws.Close() + time.Sleep(10 * time.Second) + continue + } + + log.Printf("subscribed to aisstream with %d MMSIs", len(mmsis)) + return ws + } +} + +func mmsiEqual(a, b []string) bool { + if len(a) != len(b) { + return false + } + sa := make([]string, len(a)) + sb := make([]string, len(b)) + copy(sa, a) + copy(sb, b) + sort.Strings(sa) + sort.Strings(sb) + return slices.Equal(sa, sb) +} + +func main() { + log.SetFlags(log.Ldate | log.Ltime | log.Lmsgprefix) + log.SetPrefix("[aisstream] ") + + db := connectDB() + defer db.Close() + + createTables(db) + + mmsis := loadMMSIs(db) + if len(mmsis) == 0 { + log.Fatal("no AIS devices found in track_devices") + } + + apiKey := getEnv("AISSTREAM_API_KEY") + + go purgeOldPositions(db) + + reconnectCh := make(chan []string, 1) + go func() { + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + for range ticker.C { + updated := loadMMSIs(db) + if len(updated) > 0 && !mmsiEqual(mmsis, updated) { + log.Printf("MMSI list changed (%d -> %d), reconnecting...", len(mmsis), len(updated)) + reconnectCh <- updated + } + } + }() + + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + + ws := connectWebSocket(apiKey, mmsis) + defer ws.Close() + + go func() { + <-sigCh + log.Println("shutting down...") + ws.Close() + db.Close() + os.Exit(0) + }() + + msgCh := make(chan []byte, 64) + go func() { + for { + _, p, err := ws.ReadMessage() + if err != nil { + log.Printf("read error: %v", err) + msgCh <- nil + return + } + msgCh <- p + } + }() + + var count int64 + for { + select { + case newMMSIs := <-reconnectCh: + mmsis = newMMSIs + ws.Close() + ws = connectWebSocket(apiKey, mmsis) + go func() { + for { + _, p, err := ws.ReadMessage() + if err != nil { + log.Printf("read error: %v", err) + msgCh <- nil + return + } + msgCh <- p + } + }() + + case p := <-msgCh: + if p == nil { + log.Println("reconnecting after read error...") + ws.Close() + ws = connectWebSocket(apiKey, mmsis) + go func() { + for { + _, p, err := ws.ReadMessage() + if err != nil { + log.Printf("read error: %v", err) + msgCh <- nil + return + } + msgCh <- p + } + }() + continue + } + + var packet AisStreamMessage + if err := json.Unmarshal(p, &packet); err != nil { + log.Printf("unmarshal error: %v", err) + continue + } + + if packet.MessageType != "PositionReport" { + continue + } + + pr := packet.Message.PositionReport + if pr == nil { + continue + } + + insertPosition(db, pr.UserID, packet.MetaData.ShipName, + pr.Latitude, pr.Longitude, pr.Cog, pr.Sog, pr.TrueHeading, pr.NavigationalStatus) + upsertLast(db, pr.UserID, packet.MetaData.ShipName, + pr.Latitude, pr.Longitude, pr.Cog, pr.Sog, pr.TrueHeading, pr.NavigationalStatus) + + count++ + if count%100 == 0 { + log.Printf("processed %d positions", count) + } + } + } +}