Details
-
New Feature
-
Resolution: Gone away
-
Major - P3
-
None
-
1.0.0
-
None
Description
For collection fetching parallelize decoding state gives performance improvements up to 40-50% in my case but it requires some ugly and boilerplate code like this:
func (c *MongoConsumer) Fetch(ctx context.Context, offers chan<- offer.Offer, minUpdatedAt time.Time) error {
|
start := time.Now()
|
raws := make(chan bson.Raw)
|
g, ctx := errgroup.WithContext(ctx)
|
g.Go(func() error {
|
return c.fetchRaw(ctx, raws, minUpdatedAt)
|
})
|
g.Go(func() error {
|
return c.unpack(ctx, raws, offers)
|
})
|
return g.Wait()
|
}
|
|
|
func (c *MongoConsumer) fetchRaw(ctx context.Context, raws chan<- bson.Raw, minUpdatedAt time.Time) error {
|
filter := getFindFilter(minUpdatedAt)
|
cur, err := c.collection.Find(ctx, filter, options.Find())
|
if err != nil {
|
return errors.Wrap(err, "mongo find")
|
}
|
defer func() {
|
_ = cur.Close(ctx)
|
}()
|
|
|
for cur.Next(ctx) {
|
select {
|
case <-ctx.Done():
|
return ctx.Err()
|
case raws <- cur.Current:
|
c.metrics.FetchBytes("mongo", len(cur.Current))
|
}
|
}
|
close(raws)
|
return cur.Err()
|
}
|
|
|
func (c *MongoConsumer) unpack(ctx context.Context, raws <-chan bson.Raw, result chan<- offer.Offer) error {
|
registry := bson.DefaultRegistry
|
worker := parallel.NewParallel(func(ctx context.Context, n int) error {
|
var o record.Offer
|
for {
|
select {
|
case <-ctx.Done():
|
return ctx.Err()
|
case doc, ok := <-raws:
|
if !ok {
|
return nil
|
}
|
err := bson.UnmarshalWithRegistry(registry, doc, &o)
|
if err != nil {
|
return err
|
}
|
select {
|
case <-ctx.Done():
|
return ctx.Err()
|
case result <- o.ToOffer():
|
c.metrics.FetchOffer("mongo")
|
}
|
}
|
}
|
|
|
}, func(ctx context.Context) {
|
})
|
|
|
return worker.Run(ctx)
|
}
|
Seems that mongo driver should have options for make it concurrent to reduce this code.