import asyncio
import parsl

import asyncio
import academy

from academy.agent import Agent, action
from academy.manager import Manager
from academy.exchange import LocalExchangeFactory, RedisExchangeFactory
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

import logging

logger = logging.getLogger("benc")
async_logger = logging.getLogger("asyncio")

# parsl.set_stream_logger(name="")

from academy.handle import exchange_context

@parsl.join_app
def fibs(n, fib_agent_handle, fib_exchange_context):
    global loop 
    ### Could be an @async_app decorator... which then pulls
    ### the relevant event loop from a DFK attribute, as it
    ### will be able to see the DFK.

    async def helper():
        global global_exchange_context
        print(f"fib async helper starting for fib({n})")
        # if we want to do academy stuff, we'll need an exchange context
        # for the handle to talk to - there's isn't a "global" context
        # like the parsl dfk context. This is ugly wiring.
        t = exchange_context.set(fib_exchange_context)
        try: # python 3.14 brings contextmanager protocol to contextvar tokens...
          return await fib_agent_handle.fib(n)
        finally:
          exchange_context.reset(t)
          print(f"fib async helper done for fib({n})")

    # awkward to wire in loop here manually - in a combo parsl/academy environment,
    # what does that look like? eg. can there be an academy_async_app adapter that
    # looks like join_app but knows how to wire the event loop and exchange client
    # in without it being user facing?
    f = asyncio.run_coroutine_threadsafe(helper(), loop)
    # print(f"future f in join is {f}")
    return f


class FibAgent(Agent):

    def __init__(self):
      self.store: dict[int, Tuple[int, int]] = {}

    @action
    async def fib(self, n):
        print(f"FibAgent: processing fib({n})")

        if n in self.store:
          a, _ = self.store[n]
          print(f"FibAgent: Completed, returning previous result from store: fib({n}) = {a}")
          return a

        if not self.store:
          a = 0
          b = 1
          start = 0
        else:
          start = max(self.store.keys())
          a, b = self.store[start]
          start += 1
          print(f"FibAgent: partially known results: continuing from fib({n}) = {a} (with internal state {b})")

        for ix in range(start,n+1):
            c = a + b
            a = b
            b = c

            print(f"FibAgent:  calculation:  fib({ix}) = {a}  (with extra internal state {b})")
            self.store[ix] = (a,b)
            await asyncio.sleep(1)

        print(f"FibAgent: Completed, returning from calculation: fib({n}) = {a}")
        return a


def parsl_main(fib_agent_handle):
    with parsl.load():
        print("parsl initialised")

        # how do we start the fibs agent here?
        # a join app that returns the handle and leaves it running as a
        # side-effect? (the future we're waiting for being the handle?)

        print("submitting fibs(7)")
        fut1 = fibs(7, fib_agent_handle, exchange_context.get())

        print("waiting for fibs")
        r = fut1.result()

        assert r == 21, "7th fib should be 21" 

        f0= fibs(fibs(4, fib_agent_handle, exchange_context.get()), fib_agent_handle, exchange_context.get())
        f1= fibs(8, fib_agent_handle, exchange_context.get())
        f2= fibs(4, fib_agent_handle, exchange_context.get())
        f3= fibs(11, fib_agent_handle, exchange_context.get())

        print("launched some fibs, now waiting")
        assert f0.result() == 8
        assert f1.result() == 34
        assert f2.result() == 5
        assert f3.result() == 144

    print("done")


async def async_main():
    print("in async_main")

    global loop
    # is there a nicer way to pass this around?
    # for example, make the DFK aware that there can be a loop, and able
    # to capture it at initialisation, ready for exposure to @async_app
    # decorator? That's possibly quite light-weight to implement on Parsl.
    loop = asyncio.get_running_loop()

    # now all the work happens controlled by a Parsl workflow. That will
    # happen to interact with this (global) event loop.

    # async with await Manager.from_exchange_factory(factory=LocalExchangeFactory(), executors=ThreadPoolExecutor()) as m:
    async with await Manager.from_exchange_factory(factory=RedisExchangeFactory(hostname='localhost', port=6379), executors=ProcessPoolExecutor()) as m:
        print(f"got manager {m!r}")
        a = FibAgent()
        print(f"local side agent object: {a!r}, {id(a)}")
        ah = await m.launch(a)
        await asyncio.to_thread(parsl_main, ah)
        print("async_main: done")
    

if __name__ == "__main__":
    asyncio.run(async_main(), debug=True)
