[SERVER-81298] Streams: "field not found, expected type array" during network failure in MergeOperator Created: 21/Sep/23 Updated: 29/Oct/23 Resolved: 22/Sep/23 |
|
| Status: | Closed |
| Project: | Core Server |
| Component/s: | None |
| Affects Version/s: | None |
| Fix Version/s: | 7.2.0-rc0 |
| Type: | Task | Priority: | Major - P3 |
| Reporter: | Matthew Normyle | Assignee: | Matthew Normyle |
| Resolution: | Fixed | Votes: | 0 |
| Labels: | init-337-m3, prioritize | ||
| Remaining Estimate: | Not Specified | ||
| Time Spent: | Not Specified | ||
| Original Estimate: | Not Specified | ||
| Assigned Teams: |
Atlas Streams
|
| Backwards Compatibility: | Fully Compatible |
| Sprint: | Sprint 32 |
| Participants: |
| Description |
mstreams: Invalid bytes in UTF8 string
Caused by a message like:
"{\"$set\":{\"$schema\":\"/mediawiki/recentchange/1.0.0\",\"meta\":{\"uri\":\"https://it.wikipedia.org/wiki/Discussioni_utente:Valepert\",\"request_id\":\"c6d88f7f-9a3c-4b7e-a7cd-421b90ed4a46\",\"id\":\"48472b65-6635-4b65-990c-fcc5e66310e6\",\"dt\":\"2023-09-21T03:05:04Z\",\"domain\":\"it.wikipedia.org\",\"stream\":\"mediawiki.recentchange\",\"topic\":\"codfw.mediawiki.recentchange\",\"partition\": {\"$numberInt\":\"0\"},\"offset\":{\"$numberInt\":\"628263911\"}},\"id\":{\"$numberInt\":\"326192030\"},\"type\":\"edit\",\"namespace\":{\"$numberInt\":\"3\"},\"title\":\"Discussioni utente:Valepert\",\"title_url\":\"https://it.wikipedia.org/wiki/Discussioni_utente:Valepert\",\"comment\":\"\355\240\274\355\275\270\",\"timestamp\":{\"$numberInt\":\"1695265504\"},\"user\":\"Valepert\",\"bot\":false,\"notify_url\":\"https://it.wikipedia.org/w/index.php?diff=135583987&oldid=135530929&rcid=326192030\",\"minor\":false,\"patrolled\":true,\"length\":{\"old\": {\"$numberInt\":\"48232\"},\"new\":{\"$numberInt\":\"48253\"}},\"revision\":{\"old\": {\"$numberInt\":\"135530929\"},\"new\":{\"$numberInt\":\"135583987\"}},\"server_url\":\"https://it.wikipedia.org\",\"server_name\":\"it.wikipedia.org\",\"server_script_path\":\"/w\",\"wiki\":\"itwiki\",\"parsedcomment\":\"\355\240\274\355\275\270\",\"_ts\":{\"$date\":{\"$numberLong\":\"1695272056821\"}},\"_stream_meta\":{\"sourceType\":\"kafka\",\"sourcePartition\": {\"$numberInt\":\"0\"},\"sourceOffset\":{\"$numberLong\":\"10882591\"},\"timestamp\":{\"$date\": {\"$numberLong\":\"1695272056821\"}}},\"_id\":{\"$oid\":\"650c82f98c47efea135f73a3\"}}}"
This kafka: kafkacat -C -b kafka.0x2f8.io:9093 -X security.protocol=SASL_PLAINTEXT -X sasl.mechanisms=PLAIN -X sasl.username=mongo -X sasl.password=mongodata_123 -C -t “topic name”
This streamProcessor: { id: '650bb29b49d37c600ee82da1', name: 'kafkaToCollection', lastModified: ISODate("2023-09-21T03:03:55.814Z"), state: 'FAILED', errorMsg: 'resource has no heartbeat', pipeline: [ *{ '$source': { connectionName: 'TestKafka1', topic: 'wiki1' }},* { '$merge': { into: { connectionName: 'TestAtlas1', db: 'test', coll: 'wiki1' } } } ] }
Happened after { partitions: [ ] } Happened again after 10821979
{ [-] _p: F attr: { [-] context: { [-] streamProcessorId: 650bb29b49d37c600ee82da1 streamProcessorName: kafkaToCollection tenantId: 650761da7df3a953fbe8390c } error: invalid bytes in UTF8 string: could not parse JSON document } c: STREAMS ctx: thread373 id: 75897 kube: { [[+]|https://splunk.corp.mongodb.com/en-US/app/cloud/search?earliest=-60m%40m&latest=now&q=search%20index%3Dmhouse-dev%20650bb29b49d37c600ee82da1%20c%3DSTREAMS&display.page.search.mode=smart&dispatch.sample_ratio=1&sid=1695266377.8807063#] }msg: encountered exception, exiting runLoop(): {error} s: E stream: stdout t: { [[+]|https://splunk.corp.mongodb.com/en-US/app/cloud/search?earliest=-60m%40m&latest=now&q=search%20index%3Dmhouse-dev%20650bb29b49d37c600ee82da1%20c%3DSTREAMS&display.page.search.mode=smart&dispatch.sample_ratio=1&sid=1695266377.8807063#] }time: 2023-09-21T03:11:07.151782139Z } mstreams Error: “field not found, expected type array” fails streamProcessor using $mergeWorks: sp.process([ { '$source': { connectionName: 'TestKafka1', topic: 'wiki1' }}] )
Fails: sp.process([ { '$source': { connectionName: 'TestKafka1', topic: 'wiki1' }}, { '$merge': { into: { connectionName: 'TestAtlas1', db: 'test', coll: 'wiki1' }} }] )
sp.createStreamProcessor('kafkaToMongoAttempt6', [ { '$source': { connectionName: 'TestKafka1', topic: 'wiki1' }}, { '$merge': { into: { connectionName: 'TestAtlas1', db: 'test', coll: 'wiki1' }} }] )
Starting at: state: { partitions: [ { partition: 0, offset: 10878743 }] }
{ [-] _p: F attr: { [-] context: { [[+]|https://splunk.corp.mongodb.com/en-US/app/cloud/search?earliest=-60m%40m&latest=now&q=search%20index%3Dmhouse-dev%20kafkaToMongoAttempt6%20c%3DSTREAMS&display.page.search.mode=smart&dispatch.sample_ratio=1&sid=1695271863.8816704#] }error: field not found, expected type array errorCode: 13111 reason: field not found, expected type array } c: STREAMS ctx: thread1582 id: 75899 kube: { [[+]|https://splunk.corp.mongodb.com/en-US/app/cloud/search?earliest=-60m%40m&latest=now&q=search%20index%3Dmhouse-dev%20kafkaToMongoAttempt6%20c%3DSTREAMS&display.page.search.mode=smart&dispatch.sample_ratio=1&sid=1695271863.8816704#] }msg: encountered exception, exiting runLoop(): {error} s: W stream: stdout t: { [[+]|https://splunk.corp.mongodb.com/en-US/app/cloud/search?earliest=-60m%40m&latest=now&q=search%20index%3Dmhouse-dev%20kafkaToMongoAttempt6%20c%3DSTREAMS&display.page.search.mode=smart&dispatch.sample_ratio=1&sid=1695271863.8816704#] }time: 2023-09-21T04:50:41.381178725Z }
Also repros with below pipeline Around offset 10854764 sp.process([ { '$source': { connectionName: 'TestKafka1', topic: 'wiki1' }}, { '$tumblingWindow': { interval: { size: 5, unit: 'second' }, pipeline: [ { '$group': { _id: '$title_url', count: { '$count': {} } } }, { '$sort': { count: -1 }}, { '$limit': 5 }, { '$group': { _id: null, top_urls: { $push: { top_urls: { title_url: "$_id", count: "$count" }}} }}] } }, { '$project': { _id: 0 }}, { '$merge': { into: { connectionName: 'TestAtlas1', db: 'test', coll: 'wiki1Top5PerHour' }} }] )
Field not found, expecting array
error: field not found, expected type array errorCode: 13111 reason: field not found, expected type array mstreams async failure just reports “resource has no heartbeat”, we should supply the error message from mstreams{ id: '650bb57f49d37c600ee82dbc', name: 'kafkaToCollectionAttempt2', lastModified: ISODate("2023-09-21T03:16:15.924Z"), state: 'FAILED', errorMsg: 'resource has no heartbeat', pipeline: [ { '$source': { connectionName: 'TestKafka1', topic: 'wiki1' }}, { '$merge': { into: { connectionName: 'TestAtlas1', db: 'test', coll: 'wiki1' }} } ] }, |