// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT

package queue

import (
	"context"
	"errors"
	"sync"
	"time"

	"code.gitea.io/gitea/modules/container"
)

var errChannelClosed = errors.New("channel is closed")

type baseChannel struct {
	c   chan []byte
	set container.Set[string]
	mu  sync.Mutex

	isUnique bool
}

var _ baseQueue = (*baseChannel)(nil)

func newBaseChannelGeneric(cfg *BaseConfig, unique bool) (baseQueue, error) {
	q := &baseChannel{c: make(chan []byte, cfg.Length), isUnique: unique}
	if unique {
		q.set = container.Set[string]{}
	}
	return q, nil
}

func newBaseChannelSimple(cfg *BaseConfig) (baseQueue, error) {
	return newBaseChannelGeneric(cfg, false)
}

func newBaseChannelUnique(cfg *BaseConfig) (baseQueue, error) {
	return newBaseChannelGeneric(cfg, true)
}

func (q *baseChannel) PushItem(ctx context.Context, data []byte) error {
	if q.c == nil {
		return errChannelClosed
	}

	if q.isUnique {
		q.mu.Lock()
		has := q.set.Contains(string(data))
		q.mu.Unlock()
		if has {
			return ErrAlreadyInQueue
		}
	}

	select {
	case q.c <- data:
		if q.isUnique {
			q.mu.Lock()
			q.set.Add(string(data))
			q.mu.Unlock()
		}
		return nil
	case <-time.After(pushBlockTime):
		return context.DeadlineExceeded
	case <-ctx.Done():
		return ctx.Err()
	}
}

func (q *baseChannel) PopItem(ctx context.Context) ([]byte, error) {
	select {
	case data, ok := <-q.c:
		if !ok {
			return nil, errChannelClosed
		}
		q.mu.Lock()
		q.set.Remove(string(data))
		q.mu.Unlock()
		return data, nil
	case <-ctx.Done():
		return nil, ctx.Err()
	}
}

func (q *baseChannel) HasItem(ctx context.Context, data []byte) (bool, error) {
	q.mu.Lock()
	defer q.mu.Unlock()
	if !q.isUnique {
		return false, nil
	}
	return q.set.Contains(string(data)), nil
}

func (q *baseChannel) Len(ctx context.Context) (int, error) {
	q.mu.Lock()
	defer q.mu.Unlock()

	if q.c == nil {
		return 0, errChannelClosed
	}

	return len(q.c), nil
}

func (q *baseChannel) Close() error {
	q.mu.Lock()
	defer q.mu.Unlock()

	close(q.c)
	if q.isUnique {
		q.set = container.Set[string]{}
	}

	return nil
}

func (q *baseChannel) RemoveAll(ctx context.Context) error {
	q.mu.Lock()
	defer q.mu.Unlock()

	for len(q.c) > 0 {
		<-q.c
	}

	if q.isUnique {
		q.set = container.Set[string]{}
	}
	return nil
}