-
Type:
New Feature
-
Resolution: Gone away
-
Priority:
Major - P3
-
None
-
Affects Version/s: 1.0.0
-
Component/s: CRUD
-
None
-
None
-
None
-
None
-
None
-
None
-
None
-
None
For collection fetching parallelize decoding state gives performance improvements up to 40-50% in my case but it requires some ugly and boilerplate code like this:
func (c *MongoConsumer) Fetch(ctx context.Context, offers chan<- offer.Offer, minUpdatedAt time.Time) error { start := time.Now() raws := make(chan bson.Raw) g, ctx := errgroup.WithContext(ctx) g.Go(func() error { return c.fetchRaw(ctx, raws, minUpdatedAt) }) g.Go(func() error { return c.unpack(ctx, raws, offers) }) return g.Wait() } func (c *MongoConsumer) fetchRaw(ctx context.Context, raws chan<- bson.Raw, minUpdatedAt time.Time) error { filter := getFindFilter(minUpdatedAt) cur, err := c.collection.Find(ctx, filter, options.Find()) if err != nil { return errors.Wrap(err, "mongo find") } defer func() { _ = cur.Close(ctx) }() for cur.Next(ctx) { select { case <-ctx.Done(): return ctx.Err() case raws <- cur.Current: c.metrics.FetchBytes("mongo", len(cur.Current)) } } close(raws) return cur.Err() } func (c *MongoConsumer) unpack(ctx context.Context, raws <-chan bson.Raw, result chan<- offer.Offer) error { registry := bson.DefaultRegistry worker := parallel.NewParallel(func(ctx context.Context, n int) error { var o record.Offer for { select { case <-ctx.Done(): return ctx.Err() case doc, ok := <-raws: if !ok { return nil } err := bson.UnmarshalWithRegistry(registry, doc, &o) if err != nil { return err } select { case <-ctx.Done(): return ctx.Err() case result <- o.ToOffer(): c.metrics.FetchOffer("mongo") } } } }, func(ctx context.Context) { }) return worker.Run(ctx) }
Seems that mongo driver should have options for make it concurrent to reduce this code.