skip to Main Content

What I am trying to achieve is the following.

In Amazon Connect, when I create a Flow with Start Streaming Data block, it starts streaming to a Kinesis Video Stream, and I can call a lambda function with the streams information.

Next, I want this lambda function to stream some audio into the Kinesis Video Stream, so the customer on the phone can hear it.

I have created the following python script, but I cannot hear the audio on the phone call. I have looked at the error logs of connect, and there are no errors that are occuring. A solution to fix this script, or any examples of a similar pipeline would be awesome.

import boto3
import botocore.exceptions
import types
from functools import lru_cache
import time
import io
import wave

class KinesisVideo(object):
    _CONTROL_SERVICES = ('kinesisvideo', )
    _DATA_SERVICES = ('kinesis-video-media', 'kinesis-video-archived-media')

    def __init__(self, session=None):
        self._session = session or boto3.Session()
        self._methods = {}
        for service in self._CONTROL_SERVICES + self._DATA_SERVICES:
            prototype = self._get_client_for_service(service)
            for method in prototype.meta.method_to_api_mapping.keys():
                self._methods[method] = service

    @lru_cache()
    def _get_arn_for_stream_name(self, stream_name):
        response = self.describe_stream(StreamName=stream_name)
        return response['StreamInfo']['StreamARN']

    @lru_cache()
    def _get_endpoint_for_stream_method(self, stream_arn, method):
        response = self.get_data_endpoint(StreamARN=stream_arn, APIName=method.upper())
        return response['DataEndpoint']

    @lru_cache()
    def _get_client_for_service(self, service, endpoint_url=None):
        client = self._session.client(service, endpoint_url=endpoint_url)
        if service == 'kinesis-video-media':
            client = self._patch_kinesis_video_media(client)
        return client

    @lru_cache()
    def _get_client_by_arguments(self, method, stream_name=None, stream_arn=None):
        service = self._methods[method]
        if service not in self._DATA_SERVICES:
            return self._get_client_for_service(service)
        if not (bool(stream_name) ^ bool(stream_arn)):
            raise botocore.exceptions.ParamValidationError(report=
                'One of StreamName or StreamARN must be defined ' + 
                'to determine service endpoint'
            )
        stream_arn = self._get_arn_for_stream_name(stream_name) if stream_name else stream_arn
        endpoint_url = self._get_endpoint_for_stream_method(stream_arn, method)
        return self._get_client_for_service(service, endpoint_url)

    def __getattr__(self, method):
        if method not in self._methods:
            return getattr(super(), method)
        kwarg_map = {'StreamName': 'stream_name', 'StreamARN': 'stream_arn'}
        def _api_call(**kwargs):
            filtered_kwargs = {kwarg_map[k]: v for k, v in kwargs.items() if k in kwarg_map}
            client = self._get_client_by_arguments(method, **filtered_kwargs)
            return getattr(client, method)(**kwargs)
        return _api_call

    @staticmethod
    def _patch_kinesis_video_media(client):
        client.meta.service_model._service_description['operations']['PutMedia'] = {
            'name': 'PutMedia',
            'http': {'method': 'POST', 'requestUri': '/putMedia'},
            'input': {'shape': 'PutMediaInput'},
            'output': {'shape': 'PutMediaOutput'},
            'errors': [
                {'shape': 'ResourceNotFoundException'},
                {'shape': 'NotAuthorizedException'},
                {'shape': 'InvalidEndpointException'},
                {'shape': 'ClientLimitExceededException'},
                {'shape': 'ConnectionLimitExceededException'},
                {'shape': 'InvalidArgumentException'}
            ],
            'authtype': 'v4-unsigned-body',
        }
        client.meta.service_model._shape_resolver._shape_map['PutMediaInput'] = {
            'type': 'structure',
            'required': ['FragmentTimecodeType', 'ProducerStartTimestamp'],
            'members': {
                'FragmentTimecodeType': {
                    'shape': 'FragmentTimecodeType',
                    'location': 'header',
                    'locationName': 'x-amzn-fragment-timecode-type',
                },
                'ProducerStartTimestamp': {
                    'shape': 'Timestamp',
                    'location': 'header',
                    'locationName': 'x-amzn-producer-start-timestamp',
                },
                'StreamARN': {
                    'shape': 'ResourceARN',
                    'location': 'header',
                    'locationName': 'x-amzn-stream-arn',
                },
                'StreamName': {
                    'shape': 'StreamName',
                    'location': 'header',
                    'locationName': 'x-amzn-stream-name',
                },
                'Payload': {
                    'shape': 'Payload',
                },
            },
            'payload': 'Payload',
        }
        client.meta.service_model._shape_resolver._shape_map['PutMediaOutput'] = {
            'type': 'structure',
            'members': {'Payload': {'shape': 'Payload'}},
            'payload': 'Payload',
        }
        client.meta.service_model._shape_resolver._shape_map['FragmentTimecodeType'] = {
            'type': 'string',
            'enum': ['ABSOLUTE', 'RELATIVE'],
        }
        client.put_media = types.MethodType(
            lambda self, **kwargs: self._make_api_call('PutMedia', kwargs),
            client,
        )
        client.meta.method_to_api_mapping['put_media'] = 'PutMedia'
        return client

def main():
    session = boto3.Session(
        aws_access_key_id='MY_ACCESS_KEY_ID',
        aws_secret_access_key='MY_SECRET_ACCESS_KEY',
        region_name='MY_REGION'
    )

    video = KinesisVideo(session=session)

    print(video.list_streams())
    
    start_tmstp = repr(time.time()).split('.')[0]

    print(start_tmstp)

    # Ensure the audio data is correctly handled for Amazon Connect
    with open('packages/python/output.mkv', 'rb') as payload_file:
        payload_data = payload_file.read()

    # Create a PCM file from the audio data
    pcm_data = io.BytesIO(payload_data)
    with wave.open(pcm_data, 'wb') as wav_file:
        wav_file.setnchannels(1)  # Mono
        wav_file.setsampwidth(2)  # 2 bytes per sample
        wav_file.setframerate(8000)  # Sample rate
        wav_file.writeframes(payload_data)

    response = video.put_media(
        StreamName='MY_STREAM_NAME',
        Payload=pcm_data,
        FragmentTimecodeType='RELATIVE',
        ProducerStartTimestamp=start_tmstp,
    )

    import json
    for event in map(json.loads, response['Payload'].read().decode('utf-8').splitlines()):
        print(event)

if __name__ == '__main__':
    main()

2

Answers


  1. This isn’t currently possible. There is no way to stream audio into Connect.

    Login or Signup to reply.
  2. I am also looking for the same, is it possible to stream audio to Amazon Connect? like when a customer calls in through Connect, we handoff that stream to an external application using KVS and send back an audio response from external application through the same KVS back to the customer. Is this possible? Genesys has a similar feature called Audio Connector which allows for bi-directional streams.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search