mirror of
https://github.com/mudler/LocalAI.git
synced 2024-06-07 19:40:48 +00:00
feat: queue up requests if not running parallel requests (#1296)
Return a GRPC which handles a lock in case it is not meant to be parallel. Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
parent
2addb9f99a
commit
548959b50f
@ -123,13 +123,12 @@ func BackendMonitorEndpoint(bm BackendMonitor) func(c *fiber.Ctx) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
client := bm.options.Loader.CheckIsLoaded(backendId)
|
model := bm.options.Loader.CheckIsLoaded(backendId)
|
||||||
|
if model == "" {
|
||||||
if client == "" {
|
|
||||||
return fmt.Errorf("backend %s is not currently loaded", backendId)
|
return fmt.Errorf("backend %s is not currently loaded", backendId)
|
||||||
}
|
}
|
||||||
|
|
||||||
status, rpcErr := client.GRPC().Status(context.TODO())
|
status, rpcErr := model.GRPC(false).Status(context.TODO())
|
||||||
if rpcErr != nil {
|
if rpcErr != nil {
|
||||||
log.Warn().Msgf("backend %s experienced an error retrieving status info: %s", backendId, rpcErr.Error())
|
log.Warn().Msgf("backend %s experienced an error retrieving status info: %s", backendId, rpcErr.Error())
|
||||||
val, slbErr := bm.SampleLocalBackendProcess(backendId)
|
val, slbErr := bm.SampleLocalBackendProcess(backendId)
|
||||||
|
@ -14,14 +14,17 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Client struct {
|
type Client struct {
|
||||||
address string
|
address string
|
||||||
busy bool
|
busy bool
|
||||||
|
parallel bool
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
|
opMutex sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewClient(address string) *Client {
|
func NewClient(address string, parallel bool) *Client {
|
||||||
return &Client{
|
return &Client{
|
||||||
address: address,
|
address: address,
|
||||||
|
parallel: parallel,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -38,6 +41,10 @@ func (c *Client) setBusy(v bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) HealthCheck(ctx context.Context) bool {
|
func (c *Client) HealthCheck(ctx context.Context) bool {
|
||||||
|
if !c.parallel {
|
||||||
|
c.opMutex.Lock()
|
||||||
|
defer c.opMutex.Unlock()
|
||||||
|
}
|
||||||
c.setBusy(true)
|
c.setBusy(true)
|
||||||
defer c.setBusy(false)
|
defer c.setBusy(false)
|
||||||
conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||||
@ -66,6 +73,10 @@ func (c *Client) HealthCheck(ctx context.Context) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) Embeddings(ctx context.Context, in *pb.PredictOptions, opts ...grpc.CallOption) (*pb.EmbeddingResult, error) {
|
func (c *Client) Embeddings(ctx context.Context, in *pb.PredictOptions, opts ...grpc.CallOption) (*pb.EmbeddingResult, error) {
|
||||||
|
if !c.parallel {
|
||||||
|
c.opMutex.Lock()
|
||||||
|
defer c.opMutex.Unlock()
|
||||||
|
}
|
||||||
c.setBusy(true)
|
c.setBusy(true)
|
||||||
defer c.setBusy(false)
|
defer c.setBusy(false)
|
||||||
conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||||
@ -79,6 +90,10 @@ func (c *Client) Embeddings(ctx context.Context, in *pb.PredictOptions, opts ...
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) Predict(ctx context.Context, in *pb.PredictOptions, opts ...grpc.CallOption) (*pb.Reply, error) {
|
func (c *Client) Predict(ctx context.Context, in *pb.PredictOptions, opts ...grpc.CallOption) (*pb.Reply, error) {
|
||||||
|
if !c.parallel {
|
||||||
|
c.opMutex.Lock()
|
||||||
|
defer c.opMutex.Unlock()
|
||||||
|
}
|
||||||
c.setBusy(true)
|
c.setBusy(true)
|
||||||
defer c.setBusy(false)
|
defer c.setBusy(false)
|
||||||
conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||||
@ -92,6 +107,10 @@ func (c *Client) Predict(ctx context.Context, in *pb.PredictOptions, opts ...grp
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) LoadModel(ctx context.Context, in *pb.ModelOptions, opts ...grpc.CallOption) (*pb.Result, error) {
|
func (c *Client) LoadModel(ctx context.Context, in *pb.ModelOptions, opts ...grpc.CallOption) (*pb.Result, error) {
|
||||||
|
if !c.parallel {
|
||||||
|
c.opMutex.Lock()
|
||||||
|
defer c.opMutex.Unlock()
|
||||||
|
}
|
||||||
c.setBusy(true)
|
c.setBusy(true)
|
||||||
defer c.setBusy(false)
|
defer c.setBusy(false)
|
||||||
conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||||
@ -104,6 +123,10 @@ func (c *Client) LoadModel(ctx context.Context, in *pb.ModelOptions, opts ...grp
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) PredictStream(ctx context.Context, in *pb.PredictOptions, f func(s []byte), opts ...grpc.CallOption) error {
|
func (c *Client) PredictStream(ctx context.Context, in *pb.PredictOptions, f func(s []byte), opts ...grpc.CallOption) error {
|
||||||
|
if !c.parallel {
|
||||||
|
c.opMutex.Lock()
|
||||||
|
defer c.opMutex.Unlock()
|
||||||
|
}
|
||||||
c.setBusy(true)
|
c.setBusy(true)
|
||||||
defer c.setBusy(false)
|
defer c.setBusy(false)
|
||||||
conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||||
@ -135,6 +158,10 @@ func (c *Client) PredictStream(ctx context.Context, in *pb.PredictOptions, f fun
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) GenerateImage(ctx context.Context, in *pb.GenerateImageRequest, opts ...grpc.CallOption) (*pb.Result, error) {
|
func (c *Client) GenerateImage(ctx context.Context, in *pb.GenerateImageRequest, opts ...grpc.CallOption) (*pb.Result, error) {
|
||||||
|
if !c.parallel {
|
||||||
|
c.opMutex.Lock()
|
||||||
|
defer c.opMutex.Unlock()
|
||||||
|
}
|
||||||
c.setBusy(true)
|
c.setBusy(true)
|
||||||
defer c.setBusy(false)
|
defer c.setBusy(false)
|
||||||
conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||||
@ -147,6 +174,10 @@ func (c *Client) GenerateImage(ctx context.Context, in *pb.GenerateImageRequest,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) TTS(ctx context.Context, in *pb.TTSRequest, opts ...grpc.CallOption) (*pb.Result, error) {
|
func (c *Client) TTS(ctx context.Context, in *pb.TTSRequest, opts ...grpc.CallOption) (*pb.Result, error) {
|
||||||
|
if !c.parallel {
|
||||||
|
c.opMutex.Lock()
|
||||||
|
defer c.opMutex.Unlock()
|
||||||
|
}
|
||||||
c.setBusy(true)
|
c.setBusy(true)
|
||||||
defer c.setBusy(false)
|
defer c.setBusy(false)
|
||||||
conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||||
@ -159,6 +190,10 @@ func (c *Client) TTS(ctx context.Context, in *pb.TTSRequest, opts ...grpc.CallOp
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) AudioTranscription(ctx context.Context, in *pb.TranscriptRequest, opts ...grpc.CallOption) (*schema.Result, error) {
|
func (c *Client) AudioTranscription(ctx context.Context, in *pb.TranscriptRequest, opts ...grpc.CallOption) (*schema.Result, error) {
|
||||||
|
if !c.parallel {
|
||||||
|
c.opMutex.Lock()
|
||||||
|
defer c.opMutex.Unlock()
|
||||||
|
}
|
||||||
c.setBusy(true)
|
c.setBusy(true)
|
||||||
defer c.setBusy(false)
|
defer c.setBusy(false)
|
||||||
conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||||
@ -191,6 +226,10 @@ func (c *Client) AudioTranscription(ctx context.Context, in *pb.TranscriptReques
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) TokenizeString(ctx context.Context, in *pb.PredictOptions, opts ...grpc.CallOption) (*pb.TokenizationResponse, error) {
|
func (c *Client) TokenizeString(ctx context.Context, in *pb.PredictOptions, opts ...grpc.CallOption) (*pb.TokenizationResponse, error) {
|
||||||
|
if !c.parallel {
|
||||||
|
c.opMutex.Lock()
|
||||||
|
defer c.opMutex.Unlock()
|
||||||
|
}
|
||||||
c.setBusy(true)
|
c.setBusy(true)
|
||||||
defer c.setBusy(false)
|
defer c.setBusy(false)
|
||||||
conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||||
@ -209,6 +248,10 @@ func (c *Client) TokenizeString(ctx context.Context, in *pb.PredictOptions, opts
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) Status(ctx context.Context) (*pb.StatusResponse, error) {
|
func (c *Client) Status(ctx context.Context) (*pb.StatusResponse, error) {
|
||||||
|
if !c.parallel {
|
||||||
|
c.opMutex.Lock()
|
||||||
|
defer c.opMutex.Unlock()
|
||||||
|
}
|
||||||
c.setBusy(true)
|
c.setBusy(true)
|
||||||
defer c.setBusy(false)
|
defer c.setBusy(false)
|
||||||
conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||||
|
@ -121,7 +121,7 @@ func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string
|
|||||||
// Wait for the service to start up
|
// Wait for the service to start up
|
||||||
ready := false
|
ready := false
|
||||||
for i := 0; i < o.grpcAttempts; i++ {
|
for i := 0; i < o.grpcAttempts; i++ {
|
||||||
if client.GRPC().HealthCheck(context.Background()) {
|
if client.GRPC(o.parallelRequests).HealthCheck(context.Background()) {
|
||||||
log.Debug().Msgf("GRPC Service Ready")
|
log.Debug().Msgf("GRPC Service Ready")
|
||||||
ready = true
|
ready = true
|
||||||
break
|
break
|
||||||
@ -140,7 +140,7 @@ func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string
|
|||||||
|
|
||||||
log.Debug().Msgf("GRPC: Loading model with options: %+v", options)
|
log.Debug().Msgf("GRPC: Loading model with options: %+v", options)
|
||||||
|
|
||||||
res, err := client.GRPC().LoadModel(o.context, &options)
|
res, err := client.GRPC(o.parallelRequests).LoadModel(o.context, &options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", fmt.Errorf("could not load model: %w", err)
|
return "", fmt.Errorf("could not load model: %w", err)
|
||||||
}
|
}
|
||||||
@ -154,11 +154,11 @@ func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string
|
|||||||
|
|
||||||
func (ml *ModelLoader) resolveAddress(addr ModelAddress, parallel bool) (*grpc.Client, error) {
|
func (ml *ModelLoader) resolveAddress(addr ModelAddress, parallel bool) (*grpc.Client, error) {
|
||||||
if parallel {
|
if parallel {
|
||||||
return addr.GRPC(), nil
|
return addr.GRPC(parallel), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, ok := ml.grpcClients[string(addr)]; !ok {
|
if _, ok := ml.grpcClients[string(addr)]; !ok {
|
||||||
ml.grpcClients[string(addr)] = addr.GRPC()
|
ml.grpcClients[string(addr)] = addr.GRPC(parallel)
|
||||||
}
|
}
|
||||||
return ml.grpcClients[string(addr)], nil
|
return ml.grpcClients[string(addr)], nil
|
||||||
}
|
}
|
||||||
|
@ -67,8 +67,8 @@ type ModelLoader struct {
|
|||||||
|
|
||||||
type ModelAddress string
|
type ModelAddress string
|
||||||
|
|
||||||
func (m ModelAddress) GRPC() *grpc.Client {
|
func (m ModelAddress) GRPC(parallel bool) *grpc.Client {
|
||||||
return grpc.NewClient(string(m))
|
return grpc.NewClient(string(m), parallel)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewModelLoader(modelPath string) *ModelLoader {
|
func NewModelLoader(modelPath string) *ModelLoader {
|
||||||
@ -147,10 +147,16 @@ func (ml *ModelLoader) ShutdownModel(modelName string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ml *ModelLoader) CheckIsLoaded(s string) ModelAddress {
|
func (ml *ModelLoader) CheckIsLoaded(s string) ModelAddress {
|
||||||
|
var client *grpc.Client
|
||||||
if m, ok := ml.models[s]; ok {
|
if m, ok := ml.models[s]; ok {
|
||||||
log.Debug().Msgf("Model already loaded in memory: %s", s)
|
log.Debug().Msgf("Model already loaded in memory: %s", s)
|
||||||
|
if c, ok := ml.grpcClients[s]; ok {
|
||||||
|
client = c
|
||||||
|
} else {
|
||||||
|
client = m.GRPC(false)
|
||||||
|
}
|
||||||
|
|
||||||
if !m.GRPC().HealthCheck(context.Background()) {
|
if !client.HealthCheck(context.Background()) {
|
||||||
log.Debug().Msgf("GRPC Model not responding: %s", s)
|
log.Debug().Msgf("GRPC Model not responding: %s", s)
|
||||||
if !ml.grpcProcesses[s].IsAlive() {
|
if !ml.grpcProcesses[s].IsAlive() {
|
||||||
log.Debug().Msgf("GRPC Process is not responding: %s", s)
|
log.Debug().Msgf("GRPC Process is not responding: %s", s)
|
||||||
|
@ -17,7 +17,7 @@ import (
|
|||||||
func (ml *ModelLoader) StopAllExcept(s string) {
|
func (ml *ModelLoader) StopAllExcept(s string) {
|
||||||
ml.StopGRPC(func(id string, p *process.Process) bool {
|
ml.StopGRPC(func(id string, p *process.Process) bool {
|
||||||
if id != s {
|
if id != s {
|
||||||
for ml.models[id].GRPC().IsBusy() {
|
for ml.models[id].GRPC(false).IsBusy() {
|
||||||
log.Debug().Msgf("%s busy. Waiting.", id)
|
log.Debug().Msgf("%s busy. Waiting.", id)
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user