Uploaded image for project: 'Go Driver'
  1. Go Driver
  2. GODRIVER-954

Need a way to parallelize decoding stage

    XMLWordPrintableJSON

Details

    • Icon: New Feature New Feature
    • Resolution: Gone away
    • Icon: Major - P3 Major - P3
    • None
    • 1.0.0
    • CRUD
    • None

    Description

      For collection fetching parallelize decoding state gives performance improvements up to 40-50% in my case but it requires some ugly and boilerplate code like this:

      func (c *MongoConsumer) Fetch(ctx context.Context, offers chan<- offer.Offer, minUpdatedAt time.Time) error {
         start := time.Now()
         raws := make(chan bson.Raw)
         g, ctx := errgroup.WithContext(ctx)
         g.Go(func() error {
            return c.fetchRaw(ctx, raws, minUpdatedAt)
         })
         g.Go(func() error {
            return c.unpack(ctx, raws, offers)
         })
         return g.Wait()
      }
       
      func (c *MongoConsumer) fetchRaw(ctx context.Context, raws chan<- bson.Raw, minUpdatedAt time.Time) error {
         filter := getFindFilter(minUpdatedAt)
         cur, err := c.collection.Find(ctx, filter, options.Find())
         if err != nil {
            return errors.Wrap(err, "mongo find")
         }
         defer func() {
            _ = cur.Close(ctx)
         }()
       
         for cur.Next(ctx) {
            select {
            case <-ctx.Done():
               return ctx.Err()
            case raws <- cur.Current:
               c.metrics.FetchBytes("mongo", len(cur.Current))
            }
         }
         close(raws)
         return cur.Err()
      }
       
      func (c *MongoConsumer) unpack(ctx context.Context, raws <-chan bson.Raw, result chan<- offer.Offer) error {
         registry := bson.DefaultRegistry
         worker := parallel.NewParallel(func(ctx context.Context, n int) error {
            var o record.Offer
            for {
               select {
               case <-ctx.Done():
                  return ctx.Err()
               case doc, ok := <-raws:
                  if !ok {
                     return nil
                  }
                  err := bson.UnmarshalWithRegistry(registry, doc, &o)
                  if err != nil {
                     return err
                  }
                  select {
                  case <-ctx.Done():
                     return ctx.Err()
                  case result <- o.ToOffer():
                     c.metrics.FetchOffer("mongo")
                  }
               }
            }
       
         }, func(ctx context.Context) {
         })
       
         return worker.Run(ctx)
      }
      

      Seems that mongo driver should have options for make it concurrent to reduce this code.

      Attachments

        Activity

          People

            matt.dale@mongodb.com Matt Dale
            abonec Alexander
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: