LocalAI/pkg/model/watchdog.go

157 lines
4.1 KiB
Go
Raw Normal View History

package model
import (
"sync"
"time"
process "github.com/mudler/go-processmanager"
"github.com/rs/zerolog/log"
)
// All GRPC Clients created by ModelLoader should have an associated injected
// watchdog that will keep track of the state of each backend (busy or not)
// and for how much time it has been busy.
// If a backend is busy for too long, the watchdog will kill the process and
// force a reload of the model
// The watchdog runs as a separate go routine,
// and the GRPC client talks to it via a channel to send status updates
type WatchDog struct {
sync.Mutex
timetable map[string]time.Time
idleTime map[string]time.Time
timeout, idletimeout time.Duration
addressMap map[string]*process.Process
addressModelMap map[string]string
pm ProcessManager
stop chan bool
busyCheck, idleCheck bool
}
type ProcessManager interface {
ShutdownModel(modelName string) error
}
func NewWatchDog(pm ProcessManager, timeoutBusy, timeoutIdle time.Duration, busy, idle bool) *WatchDog {
return &WatchDog{
timeout: timeoutBusy,
idletimeout: timeoutIdle,
pm: pm,
timetable: make(map[string]time.Time),
idleTime: make(map[string]time.Time),
addressMap: make(map[string]*process.Process),
busyCheck: busy,
idleCheck: idle,
addressModelMap: make(map[string]string),
}
}
func (wd *WatchDog) Shutdown() {
wd.Lock()
defer wd.Unlock()
wd.stop <- true
}
func (wd *WatchDog) AddAddressModelMap(address string, model string) {
wd.Lock()
defer wd.Unlock()
wd.addressModelMap[address] = model
}
func (wd *WatchDog) Add(address string, p *process.Process) {
wd.Lock()
defer wd.Unlock()
wd.addressMap[address] = p
}
func (wd *WatchDog) Mark(address string) {
wd.Lock()
defer wd.Unlock()
wd.timetable[address] = time.Now()
delete(wd.idleTime, address)
}
func (wd *WatchDog) UnMark(ModelAddress string) {
wd.Lock()
defer wd.Unlock()
delete(wd.timetable, ModelAddress)
wd.idleTime[ModelAddress] = time.Now()
}
func (wd *WatchDog) Run() {
log.Info().Msg("[WatchDog] starting watchdog")
for {
select {
case <-wd.stop:
log.Info().Msg("[WatchDog] Stopping watchdog")
return
case <-time.After(30 * time.Second):
if !wd.busyCheck && !wd.idleCheck {
log.Info().Msg("[WatchDog] No checks enabled, stopping watchdog")
return
}
if wd.busyCheck {
wd.checkBusy()
}
if wd.idleCheck {
wd.checkIdle()
}
}
}
}
func (wd *WatchDog) checkIdle() {
wd.Lock()
defer wd.Unlock()
log.Debug().Msg("[WatchDog] Watchdog checks for idle connections")
for address, t := range wd.idleTime {
log.Debug().Msgf("[WatchDog] %s: idle connection", address)
if time.Since(t) > wd.idletimeout {
log.Warn().Msgf("[WatchDog] Address %s is idle for too long, killing it", address)
p, ok := wd.addressModelMap[address]
if ok {
if err := wd.pm.ShutdownModel(p); err != nil {
log.Error().Msgf("[watchdog] Error shutting down model %s: %v", p, err)
}
log.Debug().Msgf("[WatchDog] model shut down: %s", address)
delete(wd.idleTime, address)
delete(wd.addressModelMap, address)
delete(wd.addressMap, address)
} else {
log.Warn().Msgf("[WatchDog] Address %s unresolvable", address)
delete(wd.idleTime, address)
}
}
}
}
func (wd *WatchDog) checkBusy() {
wd.Lock()
defer wd.Unlock()
log.Debug().Msg("[WatchDog] Watchdog checks for busy connections")
for address, t := range wd.timetable {
log.Debug().Msgf("[WatchDog] %s: active connection", address)
if time.Since(t) > wd.timeout {
model, ok := wd.addressModelMap[address]
if ok {
log.Warn().Msgf("[WatchDog] Model %s is busy for too long, killing it", model)
if err := wd.pm.ShutdownModel(model); err != nil {
log.Error().Msgf("[watchdog] Error shutting down model %s: %v", model, err)
}
log.Debug().Msgf("[WatchDog] model shut down: %s", address)
delete(wd.timetable, address)
delete(wd.addressModelMap, address)
delete(wd.addressMap, address)
} else {
log.Warn().Msgf("[WatchDog] Address %s unresolvable", address)
delete(wd.timetable, address)
}
}
}
}