What I am trying to achieve is the following.
In Amazon Connect, when I create a Flow with Start Streaming Data block, it starts streaming to a Kinesis Video Stream, and I can call a lambda function with the streams information.
Next, I want this lambda function to stream some audio into the Kinesis Video Stream, so the customer on the phone can hear it.
I have created the following python script, but I cannot hear the audio on the phone call. I have looked at the error logs of connect, and there are no errors that are occuring. A solution to fix this script, or any examples of a similar pipeline would be awesome.
import boto3
import botocore.exceptions
import types
from functools import lru_cache
import time
import io
import wave
class KinesisVideo(object):
_CONTROL_SERVICES = ('kinesisvideo', )
_DATA_SERVICES = ('kinesis-video-media', 'kinesis-video-archived-media')
def __init__(self, session=None):
self._session = session or boto3.Session()
self._methods = {}
for service in self._CONTROL_SERVICES + self._DATA_SERVICES:
prototype = self._get_client_for_service(service)
for method in prototype.meta.method_to_api_mapping.keys():
self._methods[method] = service
@lru_cache()
def _get_arn_for_stream_name(self, stream_name):
response = self.describe_stream(StreamName=stream_name)
return response['StreamInfo']['StreamARN']
@lru_cache()
def _get_endpoint_for_stream_method(self, stream_arn, method):
response = self.get_data_endpoint(StreamARN=stream_arn, APIName=method.upper())
return response['DataEndpoint']
@lru_cache()
def _get_client_for_service(self, service, endpoint_url=None):
client = self._session.client(service, endpoint_url=endpoint_url)
if service == 'kinesis-video-media':
client = self._patch_kinesis_video_media(client)
return client
@lru_cache()
def _get_client_by_arguments(self, method, stream_name=None, stream_arn=None):
service = self._methods[method]
if service not in self._DATA_SERVICES:
return self._get_client_for_service(service)
if not (bool(stream_name) ^ bool(stream_arn)):
raise botocore.exceptions.ParamValidationError(report=
'One of StreamName or StreamARN must be defined ' +
'to determine service endpoint'
)
stream_arn = self._get_arn_for_stream_name(stream_name) if stream_name else stream_arn
endpoint_url = self._get_endpoint_for_stream_method(stream_arn, method)
return self._get_client_for_service(service, endpoint_url)
def __getattr__(self, method):
if method not in self._methods:
return getattr(super(), method)
kwarg_map = {'StreamName': 'stream_name', 'StreamARN': 'stream_arn'}
def _api_call(**kwargs):
filtered_kwargs = {kwarg_map[k]: v for k, v in kwargs.items() if k in kwarg_map}
client = self._get_client_by_arguments(method, **filtered_kwargs)
return getattr(client, method)(**kwargs)
return _api_call
@staticmethod
def _patch_kinesis_video_media(client):
client.meta.service_model._service_description['operations']['PutMedia'] = {
'name': 'PutMedia',
'http': {'method': 'POST', 'requestUri': '/putMedia'},
'input': {'shape': 'PutMediaInput'},
'output': {'shape': 'PutMediaOutput'},
'errors': [
{'shape': 'ResourceNotFoundException'},
{'shape': 'NotAuthorizedException'},
{'shape': 'InvalidEndpointException'},
{'shape': 'ClientLimitExceededException'},
{'shape': 'ConnectionLimitExceededException'},
{'shape': 'InvalidArgumentException'}
],
'authtype': 'v4-unsigned-body',
}
client.meta.service_model._shape_resolver._shape_map['PutMediaInput'] = {
'type': 'structure',
'required': ['FragmentTimecodeType', 'ProducerStartTimestamp'],
'members': {
'FragmentTimecodeType': {
'shape': 'FragmentTimecodeType',
'location': 'header',
'locationName': 'x-amzn-fragment-timecode-type',
},
'ProducerStartTimestamp': {
'shape': 'Timestamp',
'location': 'header',
'locationName': 'x-amzn-producer-start-timestamp',
},
'StreamARN': {
'shape': 'ResourceARN',
'location': 'header',
'locationName': 'x-amzn-stream-arn',
},
'StreamName': {
'shape': 'StreamName',
'location': 'header',
'locationName': 'x-amzn-stream-name',
},
'Payload': {
'shape': 'Payload',
},
},
'payload': 'Payload',
}
client.meta.service_model._shape_resolver._shape_map['PutMediaOutput'] = {
'type': 'structure',
'members': {'Payload': {'shape': 'Payload'}},
'payload': 'Payload',
}
client.meta.service_model._shape_resolver._shape_map['FragmentTimecodeType'] = {
'type': 'string',
'enum': ['ABSOLUTE', 'RELATIVE'],
}
client.put_media = types.MethodType(
lambda self, **kwargs: self._make_api_call('PutMedia', kwargs),
client,
)
client.meta.method_to_api_mapping['put_media'] = 'PutMedia'
return client
def main():
session = boto3.Session(
aws_access_key_id='MY_ACCESS_KEY_ID',
aws_secret_access_key='MY_SECRET_ACCESS_KEY',
region_name='MY_REGION'
)
video = KinesisVideo(session=session)
print(video.list_streams())
start_tmstp = repr(time.time()).split('.')[0]
print(start_tmstp)
# Ensure the audio data is correctly handled for Amazon Connect
with open('packages/python/output.mkv', 'rb') as payload_file:
payload_data = payload_file.read()
# Create a PCM file from the audio data
pcm_data = io.BytesIO(payload_data)
with wave.open(pcm_data, 'wb') as wav_file:
wav_file.setnchannels(1) # Mono
wav_file.setsampwidth(2) # 2 bytes per sample
wav_file.setframerate(8000) # Sample rate
wav_file.writeframes(payload_data)
response = video.put_media(
StreamName='MY_STREAM_NAME',
Payload=pcm_data,
FragmentTimecodeType='RELATIVE',
ProducerStartTimestamp=start_tmstp,
)
import json
for event in map(json.loads, response['Payload'].read().decode('utf-8').splitlines()):
print(event)
if __name__ == '__main__':
main()
2
Answers
This isn’t currently possible. There is no way to stream audio into Connect.
I am also looking for the same, is it possible to stream audio to Amazon Connect? like when a customer calls in through Connect, we handoff that stream to an external application using KVS and send back an audio response from external application through the same KVS back to the customer. Is this possible? Genesys has a similar feature called Audio Connector which allows for bi-directional streams.