整体思路:先判断是否命中缓存,没有的话再进入查询队列

依赖的包

https://github.com/ClickHouse/ClickHouse

https://github.com/prashanthpai/sqlcache => https://github.com/night1008/sqlcache

https://github.com/DATA-DOG/go-sqlmock

https://github.com/blockloop/scan

现在的问题是 clickhouse 的查询结果可能返回复杂类型,比如 Map(String, UInt8),
如果命中缓存,缓存的结果为 driver.Rows,需要转换成 *sql.Rows 方便后续使用,
因此想到通过 sqlmock 的方式,但是默认的 value converterdriver.DefaultParameterConverter,只能转换基础类型,
复杂类型也没有定义专门用于解析的结构体,会报诸如以下的错误,

panic: row #1, column #2 ("mapper") type map[string]uint8: unsupported type map[string]uint8, a map

好在 go-sqlmock 可以指定 ValueConverterOption,这样就可以把 driver.Rows 转换成 *sql.Rows 了。

以下为测试代码

package main

import (
	"context"
	"database/sql"
	"database/sql/driver"
	"fmt"
	"io"
	"log"
        "regexp"

	"github.com/DATA-DOG/go-sqlmock"
	"github.com/prashanthpai/sqlcache"
	"github.com/prashanthpai/sqlcache/cache"

	"github.com/dgraph-io/ristretto"
	_ "github.com/ClickHouse/clickhouse-go/v2"
)

const (
	defaultMaxRowsToCache = 100
)

func newRistrettoCache(maxRowsToCache int64) (cache.Cacher, error) {
	c, err := ristretto.NewCache(&ristretto.Config{
		NumCounters: 10 * maxRowsToCache,
		MaxCost:     maxRowsToCache,
		BufferItems: 64,
	})
	if err != nil {
		return nil, err
	}

	return sqlcache.NewRistretto(c), nil
}

var querySQL string = `
		-- @cache-ttl 60
		-- @cache-max-rows 10
		SELECT 1 AS id,
		'aaa' AS name,
		CAST((['Ready', 'Steady', 'Go'], [1, 2, 3]), 'Map(String, UInt8)') AS mapper;`

type customConverter struct{}

func (customConverter) ConvertValue(v any) (driver.Value, error) {
	return v, nil
}

func main() {
	cache, err := newRistrettoCache(defaultMaxRowsToCache)
	if err != nil {
		log.Fatalf("newRistrettoCache() failed: %v", err)
	}

	interceptor, err := sqlcache.NewInterceptor(&sqlcache.Config{
		Cache: cache, // pick a Cacher interface implementation of your choice (redis or ristretto)
	})
	if err != nil {
		log.Fatalf("sqlcache.NewInterceptor() failed: %v", err)
	}

	defer func() {
		fmt.Printf("\nInterceptor metrics: %+v\n", interceptor.Stats())
	}()

	dsn := "clickhouse://localhost:9000"

	conn, err := sql.Open("clickhouse", dsn)
	if err != nil {
		log.Fatal(err)
	}
	// install the wrapper which wraps pgx driver
	sql.Register("clickhouse-sqlcache", interceptor.Driver(conn.Driver()))

	db, err := sql.Open("clickhouse-sqlcache", dsn)
	if err != nil {
		log.Fatal(err)
	}
	defer db.Close()

	if err = db.PingContext(context.TODO()); err != nil {
		log.Fatal(fmt.Errorf("db.PingContext() failed: %w", err))
	}

	rows, err := db.QueryContext(context.TODO(), querySQL)
	if err != nil {
		log.Fatal(fmt.Errorf("db.QueryContext() failed: %w", err))
	}
	defer rows.Close()

	for rows.Next() {
		var id int
		var name string
		var mapper interface{} //map[string]uint64
		if err := rows.Scan(&id, &name, &mapper); err != nil {
			log.Fatal(fmt.Errorf("rows.Scan() failed: %w", err))
		}
		fmt.Println("===> ", id, name, mapper)
	}

	db, mock, _ := sqlmock.New(sqlmock.ValueConverterOption(customConverter{}))
	cacheRows := interceptor.CheckCache(context.Background(), querySQL, nil)
	if cacheRows != nil {
		var dests [][]driver.Value
		for {
			dest := make([]driver.Value, len(cacheRows.Columns()))
			if err := cacheRows.Next(dest); err == nil {
				dests = append(dests, dest)
			} else if err == io.EOF {
				break
			} else {
				log.Fatal(err)
			}
		}
                cacheItem, ok := cacheRows.(*sqlcache.RowsCached)
	        if !ok {
		       log.Fatal(fmt.Errorf("cacheRows is not *sqlcache.RowsCached"))
	        }
	        mockColumns := make([]*sqlmock.Column, 0, len(cacheRows.Columns()))
	        for i, column := range cacheRows.Columns() {
		        mockColumns = append(mockColumns, sqlmock.NewColumn(column).OfType(cacheItem.DatabaseTypeNames[i], ""))
	        }
	        mockRows := mock.NewRowsWithColumnDefinition(mockColumns...).AddRows(dests...)
		// mockRows := mock.NewRows(cacheRows.Columns()).AddRows(dests...)
		mock.ExpectQuery(regexp.QuoteMeta(querySQL)).WillReturnRows(mockRows)
		rows, _ := db.Query(querySQL)
		fmt.Println(rows)
  
                type Person struct {
			ID   int    `db:"id"`
			Name string `db:"name"`
			// Mapper map[string]uint64 `db:"mapper"`
		}
		var persons []Person
		err := scan.Rows(&persons, rows)
		if err != nil {
			log.Fatal(err)
		}
		fmt.Printf("%#v\n", persons)
	}
}