[GODRIVER-954] Need a way to parallelize decoding stage Created: 10/Apr/19  Updated: 27/Oct/23  Resolved: 09/Sep/22

Status: Closed
Project: Go Driver
Component/s: CRUD
Affects Version/s: 1.0.0
Fix Version/s: None

Type: New Feature Priority: Major - P3
Reporter: Alexander Assignee: Matt Dale
Resolution: Gone away Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified


 Description   

For collection fetching parallelize decoding state gives performance improvements up to 40-50% in my case but it requires some ugly and boilerplate code like this:

func (c *MongoConsumer) Fetch(ctx context.Context, offers chan<- offer.Offer, minUpdatedAt time.Time) error {
   start := time.Now()
   raws := make(chan bson.Raw)
   g, ctx := errgroup.WithContext(ctx)
   g.Go(func() error {
      return c.fetchRaw(ctx, raws, minUpdatedAt)
   })
   g.Go(func() error {
      return c.unpack(ctx, raws, offers)
   })
   return g.Wait()
}
 
func (c *MongoConsumer) fetchRaw(ctx context.Context, raws chan<- bson.Raw, minUpdatedAt time.Time) error {
   filter := getFindFilter(minUpdatedAt)
   cur, err := c.collection.Find(ctx, filter, options.Find())
   if err != nil {
      return errors.Wrap(err, "mongo find")
   }
   defer func() {
      _ = cur.Close(ctx)
   }()
 
   for cur.Next(ctx) {
      select {
      case <-ctx.Done():
         return ctx.Err()
      case raws <- cur.Current:
         c.metrics.FetchBytes("mongo", len(cur.Current))
      }
   }
   close(raws)
   return cur.Err()
}
 
func (c *MongoConsumer) unpack(ctx context.Context, raws <-chan bson.Raw, result chan<- offer.Offer) error {
   registry := bson.DefaultRegistry
   worker := parallel.NewParallel(func(ctx context.Context, n int) error {
      var o record.Offer
      for {
         select {
         case <-ctx.Done():
            return ctx.Err()
         case doc, ok := <-raws:
            if !ok {
               return nil
            }
            err := bson.UnmarshalWithRegistry(registry, doc, &o)
            if err != nil {
               return err
            }
            select {
            case <-ctx.Done():
               return ctx.Err()
            case result <- o.ToOffer():
               c.metrics.FetchOffer("mongo")
            }
         }
      }
 
   }, func(ctx context.Context) {
   })
 
   return worker.Run(ctx)
}

Seems that mongo driver should have options for make it concurrent to reduce this code.



 Comments   
Comment by PM Bot [ 09/Sep/22 ]

There hasn't been any recent activity on this ticket, so we're resolving it. Thanks for reaching out! Please feel free to comment on this if you're able to provide more information.

Comment by Matt Dale [ 25/Aug/22 ]

Hey abonec thanks for the ticket and apologies for the really slow response! That's a great observation that advancing a Cursor and unmarshaling values concurrently can improve performance. However, I'm not sure that the proposed improvement will offer better performance in all cases. The performance improvement seems highly dependent on the balance between operation latency and unmarshal latency, as well as the number of documents returned, which may vary wildly for different users and use cases.

I've got some questions to help me understand your use case better:

  • How many documents do your Find operations typically match?
  • What is a typical latency for a Find operation?
  • How large (in bytes) is a typical bson.Raw document that you're unmarshaling to an offer.Offer?
  • What fields are in the offer.Offer struct?
  • What version of the Go Driver are you using?

Concerning the example code you included, I noticed that it reads the Cursor.Current value concurrently with advancing the cursor (i.e. calling Cursor.Next), which may lead to data races or other unexpected results if the Current byte slice is reused for a subsequent cursor value. You should always make a copy of the Current value if you are going to possibly advance the cursor before reading the value of Current.

E.g.

func (c *MongoConsumer) fetchRaw(...) error {
	// ...
	for cur.Next(ctx) {
		// Make a copy of 
		raw := make(bson.Raw, len(cur.Current))
		copy(raw, cur.Current)
		select {
		case <-ctx.Done():
			return ctx.Err()
		case raws <- raw:
			c.metrics.FetchBytes("mongo", len(cur.Current))
		}
	}

Generated at Thu Feb 08 08:35:20 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.