通过 gorm clickhouse 进行批量数据插入时,如果超过批大小,就会报以下错误,

[clickhouse][conn=1][127.0.0.1:9000][exception] code: 101, message: Unexpected packet Query received from client
[clickhouse-std][conn=0][127.0.0.1:9000] PrepareContext error: code: 101, message: Unexpected packet Query received from client

测试代码如下,

package main

import (
	"crypto/md5"
	"fmt"
	"io"
	"time"

	std_ck "github.com/ClickHouse/clickhouse-go/v2"
	"gorm.io/datatypes"
	"gorm.io/driver/clickhouse"
	"gorm.io/gorm"
)

type AdTencentReportCK struct {
	AppID      int64  `json:"app_id"`                            // funnydb 应用ID
	ReportID   string `json:"report_id"`                         // 报表ID
	ReportType string `json:"report_type"`                       // 报表类型
	Level      string `json:"level"`                             // 报表类型级别
	TimeLine   string `json:"time_line"`                         // 时间口径
	AccountId  int64  `json:"account_id"`                        // 账号ID
	Date       string `json:"date"`                              // 日期
	Hour       int64  `json:"hour"`                              // 小时(0-23)
	IngestTime int64  `json:"ingest_time" gorm:"autoCreateTime"` // 采集时间,毫秒级
	Data       datatypes.JSON
}

func (AdTencentReportCK) TableName() string {
	return "ad_tencent_reports_test"
}

func GenerateReportID(keys ...string) string {
	w := md5.New()
	for _, key := range keys {
		_, _ = io.WriteString(w, key)
	}
	return fmt.Sprintf("%x", w.Sum(nil))
}

func main() {
	ckSTDDB := std_ck.OpenDB(&std_ck.Options{
		Addr: []string{"127.0.0.1:9000"},
		Auth: std_ck.Auth{
			Database: "ad_reports",
			Username: "admin",
			Password: "secret",
		},
		Settings: std_ck.Settings{
			"max_execution_time": 3600,
			"final":              1,
		},
		DialTimeout: 5 * time.Second,
		Compression: &std_ck.Compression{
			Method: std_ck.CompressionLZ4,
		},
		Debug: true,
	})

	ckDB, err := gorm.Open(clickhouse.New(clickhouse.Config{
		Conn: ckSTDDB,
	}))
	if err != nil {
		panic(err)
	}

	var reports []*AdTencentReportCK
	for i := 0; i < 110; i++ {
		report := AdTencentReportCK{
			AppID:      1,
			ReportID:   GenerateReportID("1", fmt.Sprintf("%d", i+1)),
			ReportType: "daily_reports",
			Level:      "REPORT_LEVEL_ADVERTISER",
			TimeLine:   "ACTIVE_TIME",
			AccountId:  int64(i + 1),
			Date:       "2025-01-01",
			IngestTime: 1763308800,
			Data:       datatypes.JSON(`{"cost": 1000}`),
		}
		reports = append(reports, &report)
	}

	if err := ckDB.CreateInBatches(reports, 100).Error; err != nil {
		panic(err)
	}
}

解决方式,直接使用,

if err := ckDB.Create(reports).Error; err != nil {
	panic(err)
}

注意事项

在同一个事务中多次提交也会触发这个错误,比如

if err := ckDB.Transaction(func(tx *gorm.DB) error {
	for _, _reports := range reports {
		if err := tx.Create(_reports).Error; err != nil {
			return err
		}
	}
	return nil
}); err != nil {
	panic(err)
}